Who is Analytics Engineer – E2E solution using bash + dbt + Looker

Hey! My name is Artemy Kozyr and I’m an Analytics Engineer at Wheely.

We could have a long and tedious discussion about what Analytics is (data/backend) Engineer, what tools he should own, what buzzwords in a trend and are valued in CV, however, in my opinion, it is much more interesting to consider the process and the results of its activities within the framework of a specific applied task.

In this post:

  • What does decision mean end-to-end and what is its value?

  • Organization Extract & Load data from MaestroQA asynchronous API

  • Modeling data marts with dbt

  • Delivering value for users using Looker

End-to-End Solution – From Idea to Value Creation

In a nutshell, end-to-end is the delivery of a complete functional solution, including all the details of the puzzle.

I propose to move on to the real scenario – this is working with the application MaestroQAthat automates the monitoring and evaluation of customer service (Customer Support).

One of the most important ideas is that the customer, whoever he is (Manager, Product Owner, CEO), almost never sets the task in engineering terms:

  • Pour 100500 gigabytes into Storage

  • Add multithreading to code

  • Write a superoptimal query

  • Create 15 dbt models

Behind any engineering task is a solution to specific business problems. For us it is:

  • Transparency of Customer Support (we record all assessments, incidents)

  • Performance in the palm of your hand (we track the dynamics of indicators over time)

  • We report on the KPI of support teams (aggregate indicators by teams, cities, countries, etc.)

  • We receive feedback and fix errors (identification of weak/problem areas and quick feedback)

  • We constantly study and analyze cases (categorization of topics, organization of trainings and analysis)

And this is the key focus that distinguishes Analytics Engineer from, for example, the classic Data Engineer, Backend Engineer. With a full range of engineering skills and practices, Analytics Engineer creates business value and solves applied problems. Speaks the same language as the solution customer and thinks in terms of business indicators.

Get the initial data – Extract & Load Data

Okay, now to the point. I chose MaestroQA intentionally – this is a source for which there are no ready-made connectors in SaaS solutions.

Therefore, we will have to implement this integration and install it on the schedule ourselves.

The choice of tools for data integration is largely a matter of taste, but I prefer to use simple shell scripts and orchestrate them using Airflow.

1. Let’s start by studying the documentation for the service API:

  • We have a number of methods available: request-raw-export, request-groups-export, request-audit-log-export, get-export-data

  • The methods take a set of parameters: apiToken, startDate, endDate, exportId

  • The resulting reports are generated asynchronously

The asynchronous API means that in response to a request for a particular upload, you will receive not the upload itself, but the number in the queue. Presenting this number in another window (method get-export-data), you will receive the download as soon as it is ready.

Using the Async API complicates the task somewhat, namely using multiple methods and saving exportIdbut this makes the task even more interesting.

2. Get API Token.

This is a secret key that will allow you to download the data associated with your account.

3. We prepare scripts for unloading.

Step 1. Request raw data upload (total_scores.sh):

  • Make a request to the API (JSON_DATA)

  • Get exportId (ticket in line)

  • Check value exportId and if OK, then go to unloading the result (retrieve.sh)

# 1. Prepare request
JSON_DATA=$(jq -n \
              --arg maestroqa_token "$MAESTROQA_TOKEN" \
              --arg start_date "$START_DATE" \
              --arg end_date "$END_DATE" \
              --arg single_file_export "$SINGLE_FILE_EXPORT" \
              --arg name "$FILE_NAME" \
              '{apiToken: $maestroqa_token, startDate: $start_date, endDate: $end_date, singleFileExport: $single_file_export, name: $name }' )

# 2. Get exportId
EXPORT_ID=$(curl -s -X POST $ENDPOINT \
    -H 'Content-Type: application/json' \
    -d "${JSON_DATA}" \
    | jq -r '.exportId')

# 3. Retrieve data by exportId
if [ -z "$EXPORT_ID" ]
then
      echo "EXPORT_ID is empty"
      exit 1
else
      echo "EXPORT_ID=$EXPORT_ID"
      EXPORT_ID=$EXPORT_ID bash retrieve.sh
fi

Step 2. Get ready upload (retrieve.sh):

  • Define a function to query the upload readiness status (get_status())

  • Make a request to the API (JSON_DATA)

  • Poll API for readiness status every 10 seconds

  • Ready (complete) get a file with the results and save to S3

# 1. function used to poll to get current status
get_status() {
    curl -s -X GET $RETRIEVE_ENDPOINT \
    -H 'Content-Type: application/json' \
    -d "${JSON_DATA}" \
    | jq -r '.status' \
    | cat
}

# 2. prepare request
JSON_DATA=$(jq -n \
              --arg maestroqa_token "$MAESTROQA_TOKEN" \
              --arg export_id "$EXPORT_ID" \
              '{apiToken: $maestroqa_token, exportId: $export_id }' )

# 3. get current status ("in progress" / "complete")
STATUS="$(get_status)"
printf "STATUS=$STATUS\n"

# 4. poll every 10 seconds
while [ "$STATUS" != "complete" ]; do
  printf "STATUS=$STATUS\n"
  sleep 10
  STATUS="$(get_status)"
done

# 5. Store resulting file to S3
curl -s -X GET $RETRIEVE_ENDPOINT \
  -H 'Content-Type: application/json' \
  -d "${JSON_DATA}" \
  | jq -r '.dataUrl' \
  | xargs curl -s \
  | aws s3 cp - s3://$BUCKET/$BUCKET_PATH/$FILE_NAME/$FILE_NAME-$START_DATE-to-$END_DATE.csv

echo "UPLOADED TO s3://$BUCKET/$BUCKET_PATH/$FILE_NAME/$FILE_NAME-$START_DATE-to-$END_DATE.csv"
  1. Automation on Airflow.

Fine! After the successful formation of the unloading in manual mode, it becomes necessary to set the execution schedule. The most convenient way to do this is through Airflow, with the ability to retry, monitor, and receive notifications.

Example DAG:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

import os
import yaml
from datetime import datetime, timedelta
from slack.notifications import failed_task_slack_notification

### INIT DAG
DAG_NAME = "maestroqa_api"
SCHEDULE_INTERVAL = '0 0 * * *'

DAG_PATH = os.path.dirname(__file__)
CONFIG_FILE_NAME = "config.yml"
CONFIG_PATH = os.path.join(DAG_PATH, CONFIG_FILE_NAME)
CONFIG = yaml.safe_load(open(CONFIG_PATH))["endpoints"]

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2022, 2, 1),
    "retries": 1,
    "retry_delay": timedelta(minutes=3),
    "catchup": False,
    "dagrun_timeout": timedelta(minutes=5),
    'on_failure_callback': failed_task_slack_notification
}

dag = DAG(
    DAG_NAME,
    default_args=default_args, 
    schedule_interval=SCHEDULE_INTERVAL
)

os.environ["START_DATE"] = "{{ execution_date.isoformat() }}"
os.environ["END_DATE"] = "{{ next_execution_date.isoformat() }}"

groups = BashOperator(
            task_id=CONFIG['groups']['FILE_NAME'],
            bash_command=f"cd {DAG_PATH}/ && bash {CONFIG['groups']['FILE_NAME']}.sh ",
            env={ **os.environ.copy(), **CONFIG['groups'] },
            trigger_rule="all_done",
            dag=dag
    )

total_scores = BashOperator(
            task_id=CONFIG['total_scores']['FILE_NAME'],
            bash_command=f"cd {DAG_PATH}/ && bash {CONFIG['total_scores']['FILE_NAME']}.sh ",
            env={ **os.environ.copy(), **CONFIG['total_scores'] },
            trigger_rule="all_done",
            dag=dag
    )

groups >> total_scores

Let’s model data marts – Transform Data

By this stage, we are uploading new data daily and the files are accumulating in S3.

Register files in S3 as EXTERNAL TABLE

To be able to access data using SELECT queries. This will help us package dbt-labs/dbt_external_tables:

version: 2

sources:

  - name: maestroqa
    database: wheely
    schema: spectrum
    tags: ["sources", "maestroqa"]
    loader: Airflow (S3 via External Tables)
    description: "MaestroQA – customer service quality assurance software"

    tables:

        - name: groups
          identifier: maestroqa_groups
          description: "Agent Groups"
          external:
            location: "s3://{{ var('s3_bucket_name') }}/maestroqa/GROUPS/"
            row_format: serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
            table_properties: "('skip.header.line.count'='1')"
          columns:
            - name: group_name
              data_type: varchar
              description: "Group / Team name: All Agents, UK Team, Ru Team, FR Team, UK Team"
            - name: group_id
              data_type: varchar
            - name: agent_name
              data_type: varchar
            - name: agent_email
              data_type: varchar
            - name: agent_ids
              data_type: varchar
              description: "List of semicolon separated Agent IDs. Used to link with MaestroQA Total Scores table"
            - name: available
              data_type: bool
              description: "Flag indicating if agent is available at the moment"

The resulting EXTERNAL TABLES will be used in dbt as sources (data sources)

Data Mart Modeling

Let’s take a look at what the raw data looks like from the API responses. 2 types of files are uploaded:

  • Groups – directory of agents, commands

  • Scores – facts about evaluations and scorings of communications

Pay special attention to the column agent_ids, which is an array of identifiers. You will have to tinker with this attribute – you need to split the array into elements and give the table a flat look by adding surrogate keys.

Our task is to assemble a wide showcase, on the basis of which you can later find answers to any question. To do this, join the tables:

{{
    config (
      materialized='table',
      dist="auto",
      sort=['graded_dt', 'country'],
      tags=['maestroqa']
    )
}}

SELECT

    -- IDs
	  scores.gradable_id
	, scores.agent_id 

    -- dimensions
    , scores.grader
	, scores.agent_name
	, scores.agent_email
	, groups.group_name
	, CASE groups.group_name
      WHEN 'Ru Team' THEN 'RU'
      WHEN 'FR Team' THEN 'FR'
      WHEN 'UK Team' THEN 'GB'
      WHEN 'All Agents' THEN 'All'
    END AS country
        
    -- dates
    , scores.date_graded::DATE AS graded_dt

    -- measures
    , scores.rubric_score
	, scores.max_rubric_score

FROM {{ ref('stg_maestroqa_total_scores') }} AS scores
    LEFT JOIN {{ ref('flt_maestroqa_groups') }} AS groups ON groups.agent_id = scores.agent_id 
        AND groups.group_name IN ('All Agents', 'Ru Team', 'FR Team', 'UK Team')

The resulting dbt model dependency graph (DAG) looks like this:

We will provide access to data through BI – Deliver value

Great, by this stage, in addition to a set of files in S3, we have a constantly updated wide table in the DBMS (showcase), referring to which we can get answers to any questions.

However, not all users are equally capable of formulating their questions in pure SQL. This barrier is designed to eliminate BI tools, the main tasks of which are to:

  • Providing a visual data query designer

  • Formation of a set of measurements, metrics and filters for subsequent use by business users

  • Grouping a range of visualizations and responses into custom dashboards

  • Setting up mailing of data and notifications according to certain rules

In Looker, each column of the source table can be assigned the status of a dimension or metric, set aggregation rules, and add a comment. This is done in LookML, which is similar to Javascript:

Further, any user can use the constructor to visually generate answers to their questions – in Looker this is called Explore:

Ready-made tiles (tiles) can be grouped into dashboards, sent to all interested users, set notifications when threshold metrics are reached:

In general, this is the tip of the iceberg regarding the work done, but it is at this stage that the main value and benefit for the company is created.

Ability to build complex solutions that meet business needs

This is what hiring managers want to see. Generalists, multi-instrumentalists who have autonomy and the ability to independently solve problems and create business value are needed in the market more than ever.

It was these aspects that I kept in mind when I worked on the course programs. Analytics Engineer and data engineer in OTUS.

This is not just a set of lessons on topics, but a single, coherent story in which the emphasis is on understanding the needs of customers. On the live sessions I and my colleagues share our experience and real cases:

  • Advanced modeling in dbt

  • Deployment and features of working with BI tools

  • Analytic patterns and SQL

  • Cases: End-to-end analytics, Company’s KPI, Timeseries analysis

I also share my observations, experience and practices in the TG channel Technology Enthusiast.

Write a comment if you have encountered the need to build similar solutions, and what approach did you use?

Thank you!

Similar Posts

Leave a Reply