Currency Rates and Analytics – Using Exchange Rates in the Data Warehouse

Hey! Connected Artemiy – Analytics Engineer from Wheely.

Today I would like to talk about converting financial indicators into different currencies. The question is quite relevant, since a large number of companies have multinational zones of presence, build global-scale analytics, prepare reports in accordance with international standards.

Let me show you how this issue is solved using modern approaches using the example of the Wheely case:

  • Expansion of the list of base currencies

  • Regular updates and receiving up-to-date courses

  • Ensuring the correctness of historical indicators

  • Maximum usability and ease of use in analytical tools

Welcome under the cut for parsing the solution to the problem of accounting for multicurrency metrics and indicators: Open Exchange Rate, Airflow, Redshift Spectrum, dbt.

New requirements for the exchange rate service

The legacy source was web service of the Central Bank of the Russian Federation… However, with the changing requirements and the expansion of the company’s areas of presence, it was not enough. For example, due to the lack of AED (UAE dirham) quotes. For some, cryptocurrency rates BTC, ETH, which are also absent in the web service of the Central Bank of the Russian Federation, may be relevant.

The new requirements can be summarized as follows:

  • Support for an extended set of base currencies that are not available in the API of the Central Bank of the Russian Federation

  • Get the most up-to-date quotes, including intraday rates

  • Minimizing data transformations outside the Data Warehouse (it is better if they do not exist at all)

Matrix of new requirements for working with exchange rates
Matrix of new requirements for working with exchange rates

The tasks to be solved are easy to visualize in the form of a matrix. Areas for which support are to be added are marked in red:

  • Integration of a new API for existing courses

  • Adding new base currencies to the download

  • Obtaining retrospective (historical) data on new currencies for past periods

  • Archiving courses from a legacy source

Legacy application for downloading currency rates generated a pivot table with a coefficient for each pair in a separate column. This is convenient when we have a strictly fixed set of currencies and column names, but it turns into a headache if the list of currencies needs to be expanded.

There was a desire to get away from all the transformations and formations of tables in pandas before the data enters the Storage. Here I adhere to the principle of applying all transformations (T in ELT) in one place, and a wonderful tool helps me with this dbt

Integration with a new data provider

As it has already become clear, you cannot do without an external data provider, so I propose to consider one of a number of currency rate providers –

Minimum plan required Developer includes:

  • 10.000 requests monthly (more than enough)

  • Hourly intraday rate updates

  • Wide range of base currencies, including cryptocurrencies

Available API methods:

To get the current exchange rates, we will use the API endpoint /latest.json

A simple request-response might look like this:

Setting on a schedule in Airflow

To regularly get up-to-date exchange rates, I will use the Airflow tool. Apache Airflow is the de facto standard for data orchestration, data engineering, and pipeline management.

The semantic component of the task graph (DAG):

  • Make an API request

  • Save the received answer (for example, as a unique key on S3)

  • Notify in Slack in case of error

DAG configuration:

  • Base currencies, from which we count rates

  • Synchronization of the launch schedule with the calculation of storefronts in the Data Warehouse

  • Service access token

The simplest DAG consists of one task with a simple shell script call:

TS=`date +"%Y-%m-%d-%H-%M-%S-%Z"`
curl -H "Authorization: Token $OXR_TOKEN" 

This is what the result of regular script execution in S3 looks like:

Today, in a normal mode, about 25 calls to the service are performed per day, the statistics are as follows:

Uploading history for new currencies

After ensuring regular unloading of all necessary currencies, you can start building history for new base currencies (which, obviously, does not exist). This will allow the amounts of transactions from previous periods to be converted into new currencies.

Unfortunately, the Developer plan does not include API endpoint calls /time-series.json, and just for the sake of this one-time task, it makes no sense to upgrade to a more expensive version.

Let’s use the method /historical/*.json and by simply polling the API in a loop to form a historical unload:

while [ "$d" != 2021-02-19 ]; do
 echo $d
 curl -H "Authorization: Token $TOKEN" "$d.json?base=AED&symbols=AED,GBP,EUR,RUB,USD" > ./export/$d.json
 d=$(date -j -v +1d -f "%Y-%m-%d" $d +%Y-%m-%d)

The peak load raised questions from colleagues who also use the service, but this was a one-time promotion:

Archiving of historical exchange rates

The entire history of exchange rates obtained from the legacy source of the Central Bank of the Russian Federation up to date X (transition to a new service provider) must be archived unchanged.

I want to keep all the courses that we showed in our analytical tools unchanged. That is, so that the amounts in dashboards and reports of business users are not changed by a single penny.

To do this, I will upload the accumulated values ​​of exchange rates for the entire historical period to Data Lake. In more detail, I will produce:

  • Transforming a legacy pivot table to 2D

  • Writing to PARQUET Columnar Format in AWS S3

Formation of an archive in S3 in the PARQUET format
CREATE EXTERNAL TABLE spectrum.currencies_cbrf
LOCATION 's3://<BUCKET>/dwh/currencies_cbrf/' AS
WITH base AS (
   SELECT 'EUR' AS base_currency
   "day" AS business_dt
   ,CASE b.base_currency
       WHEN 'EUR' THEN 1
       WHEN 'GBP' THEN gbp_to_eur
       WHEN 'RUB' THEN rub_to_eur
       WHEN 'USD' THEN usd_to_eur
       ELSE NULL
     END AS eur
   ,CASE b.base_currency
       WHEN 'EUR' THEN eur_to_gbp
       WHEN 'GBP' THEN 1
       WHEN 'RUB' THEN rub_to_gbp
       WHEN 'USD' THEN usd_to_gbp
       ELSE NULL
     END AS gbp
   ,CASE b.base_currency
       WHEN 'EUR' THEN eur_to_rub
       WHEN 'GBP' THEN gbp_to_rub
       WHEN 'RUB' THEN 1
       WHEN 'USD' THEN usd_to_rub
       ELSE NULL
     END AS rub
   ,CASE b.base_currency
       WHEN 'EUR' THEN eur_to_usd
       WHEN 'GBP' THEN gbp_to_usd
       WHEN 'RUB' THEN rub_to_usd
       WHEN 'USD' THEN 1
       ELSE NULL
     END AS usd     
FROM ext.currencies c
   CROSS JOIN base b

So in S3, I now have a static snapshot of all exchange rates ever used in analytic applications, serialized into an optimized columnar format with compression. If it is necessary to recalculate storefronts and historical data, I can easily use these courses.

Accessing data from DWH via S3 External Table

And now the most interesting thing – from my analytical engine Amazon Redshift, I want to be able to simply and quickly access the most current exchange rates, use them in my transformations.

The best solution is to create external EXTERNAL TABLE tables that provide SQL access to the data stored in S3. At the same time, we can read semi-structured data in JSON format, binary data in AVRO, ORC, PARQUET formats and other options. The product has a name Redshift spectrum and is closely related to the SQL engine Amazon Athenawhich has a lot to do with Presto.

CREATE EXTERNAL TABLE IF NOT EXISTS spectrum.currencies_oxr (
   "timestamp" bigint
   , base varchar(3)
   , rates struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>
ROW format serde ''
LOCATION 's3://<BUCKET>/dwh/currencies/'

Pay attention to the reference to the attached document rates by creating a data type struct

Now let’s add the secret power dbt to this task. Module dbt-external-tables allows you to automate the creation of EXTERNAL TABLES and register them as data sources:

   - name: external
     schema: spectrum
     tags: ["spectrum"]
     loader: S3
     description: "External data stored in S3 accessed vith Redshift Spectrum"
       - name: currencies_oxr
         description: "Currency Exchange Rates fetched from OXR API"
           error_after: {count: 15, period: hour}
         loaded_at_field: timestamp 'epoch' + "timestamp" * interval '1 second'
           location: "s3://<BUCKET>/dwh/currencies/"
           row_format: "serde ''"
           - name: timestamp
             data_type: bigint
           - name: base
             data_type: varchar(3)
           - name: rates
             data_type: struct<aed:float8, eur:float8, gbp:float8, rub:float8, usd:float8>

An important element is checking the timeliness of the data – source freshness test on exchange rates. Thus, we will constantly keep our finger on the pulse of the arrival of relevant data in the Storage. It is very important to calculate all financial metrics correctly and on time, and it is impossible to solve the problem without actual rates.

In case of data backlog – more than 15 hours without fresh exchange rates – we immediately receive a notification in Slack.

For transparency and simplicity of users, we will combine historical data (archive) and constantly incoming current courses (new API) into one model currencies:

Combining historical and new data into a single directory
       sort=["business_dt", "base_currency"]
with cbrf as (
   , null as business_ts
   , base_currency
   , aed
   , eur
   , gbp
   , rub
   , usd
 from {{ source('external', 'currencies_cbrf') }}
 where business_dt <= '2021-02-18'
oxr_all as (
     (timestamp 'epoch' + o."timestamp" * interval '1 second')::date as business_dt
   , (timestamp 'epoch' + o."timestamp" * interval '1 second') as business_ts
   , o.base as base_currency
   , o.rates.aed::decimal(10,4) as aed
   , o.rates.eur::decimal(10,4) as eur
   , o.rates.gbp::decimal(10,4) as gbp
   , o.rates.rub::decimal(10,4) as rub
   , o.rates.usd::decimal(10,4) as usd
   , row_number() over (partition by base_currency, business_dt order by business_ts desc) as rn
   from {{ source('external', 'currencies_oxr') }} as o
   where business_dt > '2021-02-18'
oxr as (
   , business_ts
   , base_currency
   , aed
   , eur
   , gbp
   , rub
   , usd
 from {{ ref('stg_currencies_oxr_all') }}
 where rn = 1
united as (
   , business_ts
   , base_currency
   , aed
   , eur
   , gbp
   , rub
   , usd
  from cbrf
 union all
   , business_ts
   , base_currency
   , aed
   , eur
   , gbp
   , rub
   , usd
  from oxr
 , business_ts
 , base_currency
 , aed
 , eur
 , gbp
 , rub
 , usd
from united

At the same time, the reference book with exchange rates is physically copied to each node of the analytical cluster Redshift and stored in a form sorted by date and base currency to speed up the work of queries.

Using Courses in Data Modeling

In general, the work with exchange rates for analysts and engineers who develop the Data Warehouse has not changed and remained very simple. All details of using the new API, accessing external semi-structured JSON documents in S3, merging with archived data are hidden. In your transformations, it is enough to make a simple join for a table with exchange rates:

       -- price_details
       , r.currency
       , {{ convert_currency('price', 'currency') }}
       , {{ convert_currency('discount', 'currency') }}
       , {{ convert_currency('insurance', 'currency') }}
       , {{ convert_currency('tips', 'currency') }}
       , {{ convert_currency('parking', 'currency') }}
       , {{ convert_currency('toll_road', 'currency') }}
   from {{ ref('requests') }} r
       left join {{ ref('stg_currencies') }} currencies on r.completed_dt_utc = currencies.business_dt
           and r.currency = currencies.base_currency

The metrics themselves are converted using a simple macro that accepts a column with the initial amount and a column with the original currency code as input:

-- currency conversion macro
{% macro convert_currency(convert_column, currency_code_column) -%}
     ( {{ convert_column }} * aed )::decimal(18,4) as {{ convert_column }}_aed
   , ( {{ convert_column }} * eur )::decimal(18,4) as {{ convert_column }}_eur
   , ( {{ convert_column }} * gbp )::decimal(18,4) as {{ convert_column }}_gbp
   , ( {{ convert_column }} * rub )::decimal(18,4) as {{ convert_column }}_rub
   , ( {{ convert_column }} * usd )::decimal(18,4) as {{ convert_column }}_usd
{%- endmacro %}

Practice-oriented development

Working with data is one of the most popular and rapidly developing areas. Every day I find new interesting problems and come up with solutions for them. It is an exciting and interesting journey that broadens horizons.

The jubilee launch of the course will take place at the end of May Data Engineer in OTUS, in which I take part in the role of a teacher.

After two years, the program has been constantly changing and adapting. The nearest launch will bring a number of innovations and will be built around cases – real applied problems of engineers:

  • Data Architecture

  • Data lake

  • Data Warehouse

  • NoSQL / NewSQL

  • MLOps

Details of the program can be found at course landing page

I also share my author’s notes and plans in the telegram channel Technology Enthusiast

Thank you for attention.

Similar Posts

Leave a Reply