Occupation market analytics with history (Yandex Data Lake + Power BI)

Once I wrote an article Market analyst profession in three clicks. In it, we connected to the HHru website from Power BI, picked up vacancies and built a dashboard for analysis.

But the connection described in the article deprives the analysis of vacancies of historicity. The dashboard stores only one “snapshot” of data at the time it is loaded into it. In other words, when the button is pressed Refresh all data in the dashboard will be overwritten. Would it be interesting to see the big picture over time? Yes, for me. Moreover, it is not particularly difficult to do this, and it costs a couple of hundred rubles a year. When choosing tools, there was a principle – as simple and cheap as possible.

In order to add historicity to the dashboard, let’s turn to cloud technologies. We need a place where we will add and store our “snapshots” of data. Moreover, raw site responses are in JSON format. We will connect to this storage from Power BI. A small diagram of the architecture in Figure 1. The algorithm is as follows, the CloudFunction in Python is scheduled to run, which makes a request to the site’s API and adds the responses to ObjectStroge. Next, the raw responses are transformed and saved in parquet format, to which Power BI will connect. Moreover, with this connection method, you can schedule an update in your Power BI personal account (you can not schedule an update with all sources).

I decided to use the Yandex Cloud infrastructure. In my opinion, the clouds of Western vendors, such as Microsoft Azure, are more mature and convenient. But the problem is that when registering you need to indicate a bank card, this will probably cut off most of the readers.

rice.  1 - Architecture
rice. 1 – Architecture

Further on steps.
We go to Yandex Cloudregister, link a bank card and create two services:

  • Yandex functions

  • S3 storage, aka Data Lake, aka Yandex Object Stroge

Yandex Object Strict

In Yandex Object Storage, you need to create a directory, then go to Object Storage and create a bucket. In the bucket, the access setting is Public, I didn’t really bother with access. You also need to create a static bucket access key. We act according to instructions from Yandex.

Yandex Cloud Functions

This is the most difficult, but you hold on :-). In Cloud Functions, you need to create three functions with a Python runtime. Function code is attached below. Timeout of functions is at least 300 seconds. And triggers to them, separated by intervals of at least 30 minutes. I got it like this, Figure 2:

figure 2 - Function triggers
figure 2 – Function triggers

In the code editor for each function, you need to create two files, the code is attached. The first is requirements.txt with the required Python packages. And the second one with Python code. In the code, we change the values ​​​​of the variables to yours, they are marked, you will find them.

Function code
Function 1
def handler(event, context):
    import requests
    import datetime
    import boto3
    import json
    import pandas as pd
    import csv
    import io
    from io import StringIO

    def create_con():
        os = boto3.client('s3',
            aws_access_key_id = 'ВАШ КЛЮЧ',
            aws_secret_access_key = 'ВАШ СЕКРЕТНЫЙ КЛЮЧ',
            region_name="central1",
            endpoint_url="https://storage.yandexcloud.net"
            )
        return os
    def get_maxpage(API):
        adress = API
        todos = requests.get(adress)
        stat = todos.status_code
        tbody  = json.loads(todos.text)
        maxpage = tbody["pages"]
        return maxpage

    def getAPI(text, API, page):
        adress = API + '&page=" + page
        todos = requests.get(adress)
        stat = todos.status_code
        todos.encoding = "utf-8'
        todos.text
        tbody = todos.json()
        t2body = json.dumps(tbody, ensure_ascii=False)
        df = pd.DataFrame(columns=['id', 'text', 'page'])
        for item in tbody["items"]:
            df = df.append({'id': item['id'], 'text': text, 'page': page}, ignore_index=True)
        return {
            'statusCode': 200,
            'body': df,
        }
    
    def get_list_req(os):
        obj = os.get_object(Bucket="ВАШ БАКЕТ",Key='ПУТЬ К ФАЙЛУ requests.csv')
        body = obj['Body'].read()
        df = pd.read_csv(io.BytesIO(body))
        return df

    os = create_con() #Подключение

    list_req = get_list_req(os) #Получаю датафрейм запросов
    df = pd.DataFrame(columns=['id', 'text', 'page'])
    for index, row in list_req.iterrows(): #Цикл по запросам, генерю файлы json
        maxpage = get_maxpage(row['API']) #получаю кол-во страниц
        int(maxpage)
        for i in range(0, maxpage):
            req = getAPI(row['text'], row['API'], str(i))
            df = df.append(req["body"], ignore_index=True)
    csv_buffer = StringIO()
    df.to_csv(csv_buffer)
    os.put_object(Bucket="ВАШ БАКЕТ", Body=csv_buffer.getvalue(), Key="Lakehouse/Config/" + "All_id_for_parse.csv", StorageClass="STANDARD")
Function 2
def handler(event, context):
    import requests
    import datetime
    import boto3
    import json
    import pandas as pd
    import csv
    import io
    from io import StringIO

    def create_con():
        os = boto3.client('s3',
            aws_access_key_id = 'ВАШ КЛЮЧ',
            aws_secret_access_key = 'ВАШ СЕКРЕТНЫЙ КЛЮЧ',
            region_name="central1",
            endpoint_url="https://storage.yandexcloud.net"
            )
        return os

    def getAPI(text, id):
        adress="https://api.hh.ru/" + 'vacancies/' + str(id)
        todos = requests.get(adress)
        stat = todos.status_code
        todos.encoding = 'utf-8'
        todos.text
        tbody = todos.json()
        t2body = json.dumps(tbody, ensure_ascii=False)
        now = datetime.date.today()
        file_key = 'Lakehouse/Data/raw/' + now.strftime('%Y-%m-%d') + '/' + text + '/' + str(id) + '.json'
        os.put_object(Bucket="ВАШ БАКЕТ", Body=t2body, Key=file_key, StorageClass="STANDARD")
    
    def get_list_req(os):
        obj = os.get_object(Bucket="ВАШ БАКЕТ",Key='Lakehouse/Config/All_id_for_parse.csv')
        body = obj['Body'].read()
        df = pd.read_csv(io.BytesIO(body))
        return df

    os = create_con() #Подключение

    list_req = get_list_req(os) #Получаю датафрейм запросов
    df = pd.DataFrame(columns=['id', 'text', 'page'])
    for index, row in list_req.iterrows(): #Цикл по запросам, генерю файлы json
        req = getAPI(row['text'], row['id'])
Function 3
def handler(event, context):
    import pandas as pd
    import requests
    from datetime import datetime, date
    import boto3
    import json
    import io
    from io import StringIO
    from io import BytesIO
    import pyarrow, fastparquet
    import urllib3
    urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
    mydate = str(date.today())
    data=[]
    columns=['id', 'name', 'salary', 'experience', 'schedule', 'employment', 'key_skills', 'employerName', 'accept_temporary',\
                                     'publishedAt', 'arhived', 'areaId', 'areaName', 'alternate_url', 'working_days', 'working_time_intervals', 'DateLoad', 'fileKey']
    df = pd.DataFrame(data, columns=columns)
                      
    df3 = df
    df_mart = df
    def create_con():
        os = boto3.client('s3',
            aws_access_key_id = 'ВАШ КЛЮЧ',
            aws_secret_access_key = 'ВАШ СЕКРЕТНЫЙ КЛЮЧ',
            region_name="central1",
            endpoint_url="https://storage.yandexcloud.net",
            verify=False
            )
        return os
    
    def get_all_s3_keys(os, bucket, prefix):
        """Get a list of all keys in an S3 bucket."""
        df = pd.DataFrame(columns=['file_key'])
        kwargs = {'Bucket': bucket, 'Prefix': prefix}
        while True:
            resp = os.list_objects_v2(**kwargs)
            for obj in resp['Contents']:
                df = pd.concat([pd.DataFrame([[obj['Key']]], columns=df.columns), df], ignore_index=True)
            try:
             kwargs['ContinuationToken'] = resp['NextContinuationToken']
            except KeyError:
                break   
        return df 

    def go_file(os, Key):
        obj = os.get_object(Bucket="ВАШ БАКЕТ",Key=Key) #Получаю файл джон
        todos = json.loads(obj['Body'].read())
        if todos["experience"] is not None:
            experience = todos["experience"]["id"]
        if todos["schedule"] is not None:
            schedule = todos["schedule"]["id"]
        if todos["employment"] is not None:
            employment = todos["employment"]["id"]
        try:
            employerName = todos["employer"]["name"]
        except:
            employerName = None
        try:
            areaId = todos["area"]["id"]
            areaName = todos["area"]["name"]
        except:
            areaId = None
            areaName = None
        row = [todos["id"], todos["name"], todos["salary"], experience, schedule, employment, todos["key_skills"], employerName, todos["accept_temporary"],\
                todos["published_at"],\
               todos["archived"], areaId, areaName, todos["alternate_url"], todos["working_days"], todos["working_time_intervals"], datetime.now(), Key]
        return pd.DataFrame([row], columns=columns)
    
    os = create_con() #Подключение
    df_list_file_key = get_all_s3_keys(os, 'ВАШ БАКЕТ', 'Lakehouse/Data/raw/' + mydate) #Получаю все ключи файлов
    for ind in df_list_file_key.index:
            df_res = go_file(os, df_list_file_key['file_key'][ind])
            df3 = pd.concat([df3, df_res])
    out_buffer = BytesIO()
    try:
        obj_mart = os.get_object(Bucket="ВАШ БАКЕТ",Key='Lakehouse/Data/proccessed/datamart') #Получаю файл parquet
        df_mart =  pd.read_parquet(io.BytesIO(obj_mart['Body'].read()))
    except:
        y = 1
    df_datamart = pd.concat([df_mart,df3]).drop_duplicates(['fileKey'],keep='last').sort_values('fileKey')
    df_datamart.astype(str).to_parquet(out_buffer, index=False)
    os.put_object(Bucket="ВАШ БАКЕТ", Body=out_buffer.getvalue(), Key='Lakehouse/Data/proccessed/datamart', StorageClass="STANDARD")
    return 'OK'
Requirements.txt internals

boto3
datetime
pandas
fsspec
requests
pyarrow
fast parquet
urllib3

The internals of the requests.csv file
"number","text","API"
"1","Azure","https://api.hh.ru/vacancies?text=Azure&search_field=name"
"2","data scientist","https://api.hh.ru/vacancies?text=data scientist&search_field=name"
"3","data engineer","https://api.hh.ru/vacancies?text=data engineer&search_field=name"

File requests.csv needed to store a list of requests for which you need to parse. This file must be put in YOUR BACKET – Lakehouse – Config – requests.csv.

Function 1 reads the requests.csv file from the bucket with vacancies for which we want to download hhru responses, and writes the id of the vacancies to the All_id_for_parse.csv file. Pythonists do not criticize the code too much :). After function 2 requests information from hhru separately for each job id. Two steps are needed because accessing each job by id will allow you to get more detailed json than with the general search results in function 1. Function 3 creates a showcase – file datamart.parquet with extracted data from json files. This file will be needed for Power BI.

Instead of three functions, two could be left if you use the new service Yandex query. But at the moment, the service is not able to read nested json, which are our json.

If everything worked out, and the functions worked out, you can start Power BI.

Power BI

To download a storefront in Power BI, you need to use the source File – Parquet. In the source, write a link to the file, as
“http://YOURBACKET.storage.yandexcloud.net/Lakehouse/Data/proccessed/datamart”.
In file datamart.parquet some fields are saved as dictionaries and lists as text, so you will need to apply a transformation in Power BI. In Power qwery, this is done in two clicks. Basically everything. You can build a dashboard. My dashboard is open and accessible by link.

Outcome

We got a whole project – Data Lake + BI. It can be used for quick deployment purposes for demos. It’s cheap. Of course, the solution is aimed at small and possibly medium volumes of data. At the moment the numbers are:

  • 700Mb of raw data came in 15 days, json format

  • 700 Mb of raw data = 0.85 Mb of data in the storefront, parquet format

  • the showcase grows by 21 Mb per year, that is, the download can live for 10 years

  • 84 kopecks are spent every day, 295 rubles a year.

  • loading is fully automated, just opening the report, you can see the new data

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *