How to Optimize Your Data Science Project with Prefect

Is there a way to streamline my Data Science project workflow with just a few lines of code? Yes. This is Prefect. We share a quick guide to working with this tool while we start flagship Data Science course


Why is this needed?

Why should you, as a data scientist, care about optimizing your workflow? Let’s start with an example of a basic data science project. Imagine you are working with an Iris dataset and started with data processing functions.

from typing import Any, Dict, List
import pandas as pd

def load_data(path: str) -> pd.DataFrame:
    ...


def get_classes(data: pd.DataFrame, target_col: str) -> List[str]:
    """Task for getting the classes from the Iris data set."""
    ...


def encode_categorical_columns(data: pd.DataFrame, target_col: str) -> pd.DataFrame:
    """Task for encoding the categorical columns in the Iris data set."""

    ...

def split_data(data: pd.DataFrame, test_data_ratio: float, classes: list) -> Dict[str, Any]:
    """Task for splitting the classical Iris data set into training and test
    sets, each split into features and labels.
    """
    ...

Once you define the functions, you execute them.

# Define parameters
target_col="species"
test_data_ratio = 0.2

# Run functions
data = load_data(path="data/raw/iris.csv")
categorical_columns = encode_categorical_columns(data=data, target_col=target_col)
classes = get_classes(data=data, target_col=target_col) 
train_test_dict = split_data(data=categorical_columns, 
                            test_data_ratio=test_data_ratio, 
                            classes=classes)

The code worked fine and you saw nothing wrong with the output, so you think the workflow is good enough. However, a linear workflow like below can have many disadvantages.

These are the disadvantages:

  • If an error occurs in the get_classes function, the output from encode_categorical_columns will be lost and the workflow will have to start over. This can be frustrating if the encode_categorical_columns function takes a long time to execute.

Running functions in this way can prevent wasting time on non-working functions. If an error occurs in get_classes, the workflow will restart immediately without waiting for encode_categorical_columns to complete.

What is Prefect?

Prefect Is an open source framework for building workflows in Python. It allows you to easily create, run, and manage data pipelines of various scales. To install it, run this command:

pip install prefect

Building a workflow with Prefect

To see how Prefect works, let’s encapsulate the workflow at the beginning of the article using it.

1. Create tasks

A task is a separate action in Prefect. Start by turning the functions defined above into tasks using the prefect.task decorator:

from prefect import task
from typing import Any, Dict, List
import pandas as pd

@task
def load_data(path: str) -> pd.DataFrame:
    ...


@task
def get_classes(data: pd.DataFrame, target_col: str) -> List[str]:
    """Task for getting the classes from the Iris data set."""
    ...


@task
def encode_categorical_columns(data: pd.DataFrame, target_col: str) -> pd.DataFrame:
    """Task for encoding the categorical columns in the Iris data set."""
    ...

@task
def split_data(data: pd.DataFrame, test_data_ratio: float, classes: list) -> Dict[str, Any]:
    """Task for splitting the classical Iris data set into training and test
    sets, each split into features and labels.
    """
    ... 

2. Create a stream

Flow represents the entire workflow, managing dependencies between tasks. To create a flow, simply paste the code to run your functions inside the context manager with Flow (…).

from prefect import task, Flow

with Flow("data-engineer") as flow:
    
    # Define parameters
    target_col="species"
    test_data_ratio = 0.2
    
    # Define tasks
    data = load_data(path="data/raw/iris.csv")
    classes = get_classes(data=data, target_col=target_col) 
    categorical_columns = encode_categorical_columns(data=data, target_col=target_col)
    train_test_dict = split_data(data=categorical_columns, test_data_ratio=test_data_ratio, classes=classes)

Note that none of these tasks are performed when the above code is executed. Prefect allows you to either start a stream right away or schedule it. Let’s try to execute the flow right away with flow.run ():

with Flow("data-engineer") as flow:
  # Define your flow here
  ...
  
flow.run()

By running the above code, you will get an output similar to this:

└── 15:49:46 | INFO    | Beginning Flow run for 'data-engineer'
└── 15:49:46 | INFO    | Task 'target_col': Starting task run...
└── 15:49:46 | INFO    | Task 'target_col': Finished task run for task with final state: 'Success'
└── 15:49:46 | INFO    | Task 'test_data_ratio': Starting task run...
└── 15:49:47 | INFO    | Task 'test_data_ratio': Finished task run for task with final state: 'Success'
└── 15:49:47 | INFO    | Task 'load_data': Starting task run...
└── 15:49:47 | INFO    | Task 'load_data': Finished task run for task with final state: 'Success'
└── 15:49:47 | INFO    | Task 'encode_categorical_columns': Starting task run...
└── 15:49:47 | INFO    | Task 'encode_categorical_columns': Finished task run for task with final state: 'Success'
└── 15:49:47 | INFO    | Task 'get_classes': Starting task run...
└── 15:49:47 | INFO    | Task 'get_classes': Finished task run for task with final state: 'Success'
└── 15:49:47 | INFO    | Task 'split_data': Starting task run...
└── 15:49:47 | INFO    | Task 'split_data': Finished task run for task with final state: 'Success'
└── 15:49:47 | INFO    | Flow run SUCCESS: all reference tasks succeeded
Flow run succeeded!

To understand the Prefect workflow, let’s render it completely. Start by installing prefect[viz]:

pip install "prefect[viz]"

Then add the visualize method to your code:

flow.visualize()

And you should see a visualization of the data-engineer workflow:

Note that Prefect automatically manages the order of tasks to optimize your workflow. It’s pretty cool in terms of extra pieces of code!

3. Add parameters

If you often experiment with different values ​​of the same variable, it is ideal to turn that variable into a Parameter.

test_data_ratio = 0.2
train_test_dict = split_data(data=categorical_columns, 
                            test_data_ratio=test_data_ratio, 
                            classes=classes)

You can think of a Parameter like a Task, except that it can receive user data every time a thread is started. Use task.Parameter to turn a variable into a parameter.

from prefect import task, Flow, Parameter 

test_data_ratio = Parameter("test_data_ratio", default=0.2)

train_test_dict = split_data(data=categorical_columns, 
                            test_data_ratio=test_data_ratio, 
                            classes=classes)

The first argument to Parameter specifies the name of the parameter. default is an optional argument, this is the default value of the parameter. Running flow.visualize again will give this output:

You can overwrite the default parameter for each launch:

Let’s add the parameters argument to the flow.run () function:

flow.run(parameters={'test_data_ratio': 0.3})
$ prefect run -p data_engineering.py --param test_data_ratio=0.2 
$ prefect run -p data_engineering.py --param-file="params.json"

The JSON file should look something like this:

{"test_data_ratio": 0.3}

You can also change the settings for each run using the Prefect Cloud software. We’ll talk about it below.

Workflow monitoring

Overview

Prefect also gives you control over your Prefect Cloud workflow. To install Prefect Cloud dependencies, follow this instruction… After installing and configuring all dependencies, start by creating a project:

prefect create project "Iris Project"

Then start a local agent to deploy streams locally on a single machine:

prefect agent local start

Add the line:

flow.register(project_name="Iris Project")

… at the end of your file, you should see something similar to this:

Flow URL: https://cloud.prefect.io/khuyentran1476-gmail-com-s-account/flow/dba26bea-8827-4db4-9988-3289f6cb662f
 └── ID: 2944dc36-cdac-4079-8497-be4ec5594785
 └── Project: Iris Project
 └── Labels: ['khuyen-Precision-7740']

Click on the URL in the output and you will be redirected to the overview page. This page displays the version of the stream, the time it was created, the execution history of the stream, and a summary of the execution.

You can also view a summary of other launches, their execution times, and their configuration.

It’s great that this information is automatically tracked by Prefect!

Start a workflow with default options

Please note that the workflow is registered with the Prefect Cloud but is not running yet. To run the workflow with the default settings, click Quick Run in the upper right corner.

Click the run you created. You will now see the activity for your new stream in real time!

Start a workflow with custom parameters

To run a workflow with custom parameters, go to the Run tab and then change the parameters in the Inputs section.

Once you have configured the parameters, simply click the Run button to start the run.

Viewing a workflow graph

By clicking the Schematic button, you will see a graph of the entire workflow.

Other functions

Prefect also provides other features that greatly improve workflow efficiency.

Input caching

Remember the problem we mentioned at the beginning of the article? Typically, if the get_classes function fails, the data generated by the encode_categorical_columns function will be discarded and the entire workflow will have to start from the beginning.

However, Prefect retains the output of encode_categorical_columns. The next time the workflow is restarted, the output of encode_categorical_columns will be used by the next task without restarting encode_categorical_columns.

This can lead to a significant reduction in the execution time of the workflow.

Saving output

Sometimes you may need to export task data. This can be done by inserting data persistence code into the Task function.

def split_data(data: pd.DataFrame, test_data_ratio: float, classes: list) -> Dict[str, Any]:
  
  X_train, X_test, y_train, y_test = ...
  
  import pickle
  pickle.save(...)

However, this will make it difficult to test the function. Prefect makes it easy to save the results of a task on each run. Here’s what you need to do for this:

Set breakpoint to True:

export PREFECT__FLOWS__CHECKPOINTING=true

Add the line result = LocalResult (dir = …)) to the @task decorator.

from prefect.engine.results import LocalResult

@task(result = LocalResult(dir="data/processed"))
def split_data(data: pd.DataFrame, test_data_ratio: float, classes: list) -> Dict[str, Any]:
    """Task for splitting the classical Iris data set into training and test
    sets, each split into features and labels.
    """
    X_train, X_test, y_train, y_test = ...
    
    return dict(
        train_x=X_train,
        train_y=y_train,
        test_x=X_test,
        test_y=y_test,

Now the output of the split_data task will be saved in the data / processed directory:

prefect-result-2021-11-06t15-37-29-605869-00-00

If you want to customize the name of your file, add the target argument to @task:

from prefect.engine.results import LocalResult

@task(target="{date:%a %b %d %H:%M:%S %Y}/{task_name}_output", 
      result = LocalResult(dir="data/processed"))
def split_data(data: pd.DataFrame, test_data_ratio: float, classes: list) -> Dict[str, Any]:
    """Task for splitting the classical Iris data set into training and test
    sets, each split into features and labels.
    """
    ...

Prefect also provides other Result classes such as GCSResult and S3Result. API output results can be checked here

Outputting other streams to the current stream

If you are working with multiple threads, such as data-engineer and data-science threads, you might want to use the output of data-engineer for data-science.

After saving the output of data-engineer, you can read it using the read method.

from prefect.engine.results import LocalResult

train_test_dict = LocalResult(dir=...).read(location=...).value

Connecting dependent streams

Imagine a situation: you have created two threads that depend on each other. The data-engineer flow must be done before the data-science flow. Someone did not understand the relationship between these two streams. The data-science and data-engineer threads were executed at the same time and an error occurred!

To prevent this from happening, we must define the relationship between the threads. Fortunately, Prefect makes it easy for us. Start by capturing two different streams with StartFlowRun. Add wait = True to the argument so that the downstream will only execute after the upstream has finished.

from prefect import Flow 
from prefect.tasks.prefect import StartFlowRun

data_engineering_flow = StartFlowRun(flow_name="data-engineer", 
                                    project_name="Iris Project",
                                    wait=True)
                                    
data_science_flow = StartFlowRun(flow_name="data-science", 
                                project_name="Iris Project",
                                wait=True)

Let’s call data_science_flow with with Flow (…). Use upstream_tasks to specify tasks / threads to run before doing data-science.

with Flow("main-flow") as flow:
    result = data_science_flow(upstream_tasks=[data_engineering_flow])
    
flow.run()

Now the two streams are connected:

Very cool!

Plan your flow

Prefect also makes it easy to run a stream at a specific time or interval. For example, to run a flow every minute, you can instantiate the IntervalSchedule class and add schedule to with Flow (…):

from prefect.schedules import IntervalSchedule

schedule = IntervalSchedule(
    start_date=datetime.utcnow() + timedelta(seconds=1),
    interval=timedelta(minutes=1),
)

data_engineering_flow = ...
data_science_flow = ...


with Flow("main-flow", schedule=schedule) as flow:
    data_science = data_science_flow(upstream_tasks=[data_engineering_flow])

Now your stream will repeat every minute! You can find out more about this. here

Logging

You can log printing inside a task simply by adding log_stdout = True to @task:

@task(log_stdout=True)
def report_accuracy(predictions: np.ndarray, test_y: pd.DataFrame) -> None:

  target = ... 
  accuracy = ...
    
  print(f"Model accuracy on test set: {round(accuracy * 100, 2)}")

The output will be something like this:

[2021-11-06 11:41:16-0500] INFO - prefect.TaskRunner | Model accuracy on test set: 93.33

Conclusion

Congratulations! We’ve just seen how Prefect optimizes your Data Science workflow with a few lines of code. A small optimization in your code can lead to huge efficiency gains in the long run. Freely fork the source code from this article:

Code on Github.

You can try Prefect in business on our courses:

Learn more stock.

Professions and courses

Data Science and Machine Learning

Python, web development

Mobile development

Java and C #

From the basics to the depth

And:

Similar Posts

Leave a Reply