MLOps: Hands-on Creating a Scalable Machine Learning Pipeline
Creating a scalable machine learning (ML) pipeline involves building a robust and flexible infrastructure that can handle the growing demands of data volume, model complexity, and deployment needs.
We will develop a ML model for predicting if the job description are related to toxic job. Model creation must be scalable, colaborative and reproducible. The principles, tools and techniques that make models scalable, colaborative and reproducible are known as MLOps.
Key perspectives from roles in MLOps lifecycle
MLOps (Machine Learning Operations) is an essential set of practices and principles that ensure machine learning models are scalable, collaborative, and reproducible. It builds on principles from DevOps (development and operations) but is tailored specifically for the unique challenges posed by machine learning workflows, including data, model development, testing, deployment, and maintenance.
Each role has a unique set of concerns when it comes to the health, performance, and impact of a machine learning model. Here’s a brief elaboration on the questions they often ask:
Data Scientist: “Why is the model drifting?”
Focuses on understanding data drift and concept drift, which occur when the model’s input data distribution or the relationship between input and target variables changes over time. This can degrade model performance.
Data Science Manager: “Is it time to retrain?”
Looks at the retraining schedule and model performance metrics to decide when it’s necessary to refresh the model with new data, ensuring that it continues to serve its purpose efficiently.
Product Manager: “What are the model limitations?”
Focuses on understanding the boundaries of the model, including its accuracy, interpretability, and situations where it might fail, to set appropriate expectations and communicate these to end-users or customers.
Data Engineer: “Does it get the right data?”
Ensures that the data pipeline feeding the model is functioning properly, with correct, timely, and relevant data being fed into the model consistently.
Model User: “Can I trust these predictions?”
Concerned with the confidence and reliability of the model outputs in real-world use cases, often needing interpretability and transparency to develop trust.
Business Stakeholder: “How much value does the model bring?”
Evaluates the business impact of the model by looking at the ROI (return on investment), cost savings, increased efficiency, or revenue generation it brings to the organization.
Support: “Why did we mark this as spam?”
Needs to understand how the model makes specific decisions, especially in cases of user queries or complaints, often needing an explanation of individual predictions.
Compliance: “Is this model safe to use?”
Concerned with whether the model complies with legal regulations and ethical guidelines, ensuring that it meets standards around fairness, privacy, and security.
Each role focuses on a different aspect of the model lifecycle, and MLOps practices help facilitate collaboration across these diverse perspectives to ensure smooth deployment and monitoring of machine learning models.
Let’s break down the pipeline into steps and build it with a focus on scalability and automation.
Install libraries and prepare the environment
I will use laptop with Ubuntu 23.10. To set up a Python virtual environment and install the required libraries for project, follow these steps:
# mkdir toxic && cd toxic
# python -V
Python 3.11.6
# python -m venv venv
# source venv/bin/activate
# pip install jupyter
# jupyter notebook
A browser window should immediately pop up with the Jupyter Notebook interface. As you might have already noticed Jupyter’s Notebooks and dashboard are web apps, and Jupyter starts up a local Python server to serve these apps to your web browser. It makes Jupyter Notebooks platform independent and thus making it easier to share with others.
sudo apt install docker-ce
You can successfully complete the Docker installation in your local environment.
Cookiecutter for managing the structure of the ML model
Using Cookiecutter to manage the structure of your machine learning projects can greatly streamline development by providing a consistent and organized setup. It allows you to create templates that scaffold out the structure for your ML projects, ensuring that they follow best practices from the start.
pip install cookiecutter
cookiecutter -c v1 https://github.com/drivendata/cookiecutter-data-science
Once you generate the project, here’s what the structure might look like:
├── LICENSE
├── Makefile <- Makefile with commands like `make data` or `make train`
├── README.md <- The top-level README for developers using this project.
├── data
│ ├── external <- Data from third party sources.
│ ├── interim <- Intermediate data that has been transformed.
│ ├── processed <- The final, canonical data sets for modeling.
│ └── raw <- The original, immutable data dump.
│
├── docs <- A default Sphinx project; see sphinx-doc.org for details
│
├── models <- Trained and serialized models, model predictions, or model summaries
│
├── notebooks <- Jupyter notebooks. Naming convention is a number (for ordering),
│ the creator's initials, and a short `-` delimited description, e.g.
│ `1.0-jqp-initial-data-exploration`.
│
├── references <- Data dictionaries, manuals, and all other explanatory materials.
│
├── reports <- Generated analysis as HTML, PDF, LaTeX, etc.
│ └── figures <- Generated graphics and figures to be used in reporting
│
├── requirements.txt <- The requirements file for reproducing the analysis environment, e.g.
│ generated with `pip freeze > requirements.txt`
│
├── setup.py <- makes project pip installable (pip install -e .) so src can be imported
├── src <- Source code for use in this project.
│ ├── __init__.py <- Makes src a Python module
│ │
│ ├── data <- Scripts to download or generate data
│ │ └── make_dataset.py
│ │
│ ├── features <- Scripts to turn raw data into features for modeling
│ │ └── build_features.py
│ │
│ ├── models <- Scripts to train models and then use trained models to make
│ │ │ predictions
│ │ ├── predict_model.py
│ │ └── train_model.py
│ │
│ └── visualization <- Scripts to create exploratory and results oriented visualizations
│ └── visualize.py
│
└── tox.ini <- tox file with settings for running tox; see tox.readthedocs.io
You can customize the cookiecutter-data-science
template, or create your own, to match your specific workflow and organizational preferences.
- Custom data pipelines: Modify or add scripts in the
src
directory to create feature engineering or custom data processing pipelines. - Models and experiments: Add your custom machine learning models and track your experiments.
- Version control: Use
Git
to version your code and track your changes within this structure.
Once you have your project structure in place, you can integrate it with tools like MLflow, DVC, or Kubeflow to handle model versioning, experiment tracking, and deployment. You can add these tools into the src/models
section to streamline your model lifecycle management.
Poetry for dependency management
Poetry is a fantastic tool for managing Python dependencies and packaging, especially for machine learning projects where maintaining consistent environments is crucial. Unlike pip
and virtualenv
, Poetry handles both dependency management and packaging, making it easier to handle project environments and dependencies.
curl -sSL https://install.python-poetry.org | python3 -
poetry init
poetry add requests
poetry show --tree
requests 2.32.3 Python HTTP for Humans.
├── certifi >=2017.4.17
├── charset-normalizer >=2,<4
├── idna >=2.5,<4
└── urllib3 >=1.21.1,<3
You can use Poetry alongside Cookiecutter to set up and manage your machine learning project structure. Just add Poetry to your template as the dependency manager, and it will automatically handle dependencies within the organized project structure. Additionally, you can integrate Poetry with MLOps tools like MLflow or DVC for experiment tracking and data versioning.
Code review with Black and Flake8
Using Black and Flake8 together is a great way to maintain high code quality and consistency in your machine learning (or any Python) projects. Black formats your code automatically, ensuring a uniform style, while Flake8 is a linter that checks for coding errors, potential bugs, stylistic issues, and PEP8 compliance.
First, ensure both tools are installed. If you’re using Poetry, you can add them as development dependencies:
poetry add --dev black flake8
If you’re using pip, you can install them globally or within your virtual environment:
pip install black flake8
To format your code using Black, simply run the following command from the root directory of your project:
black .
This will format all .py
files in the current directory (and subdirectories) according to Black’s opinionated style. Black automatically makes decisions about line lengths, indentation, and more, so you don't have to worry about the specifics.
To check your code for style issues, potential errors, and PEP8 violations using Flake8, run:
flake8 .
This will lint all .py
files in the current directory. Flake8 will output warnings and errors for things like:
- Line length violations
- Unused imports/variables
- Incorrect indentation
- Missing docstrings
Automatically generate documentation for ML project
Automatically generating documentation for a machine learning project can save a lot of time and ensure consistency in explaining the structure, usage, and purpose of the project. You can leverage tools like Sphinx, MkDocs, or pdoc to generate project documentation, especially if you’re using Python.
If you’re already using Cookiecutter and it comes with Sphinx integrated, you can leverage the existing structure to automatically generate and manage your documentation.
Solution Design
Designing a solution, especially for complex systems like a machine learning project or a data pipeline, requires breaking down the problem into manageable components. Solution design typically includes aspects like architecture, data flow, component responsibilities, technology stack, scalability, and security.
Problem Statement and Scope Definition
The first step is to clearly define the problem you are solving. This section provides context for the solution, including:
- Business Problem: What specific business issue are you addressing? (e.g., predicting customer churn, improving recommendation accuracy). We focus on improving workplace culture, attracting talent and reducing the risks associated with a negative work environment.
- Target Users: Who will use the solution? (e.g., data scientists, product managers, end-users). Would be primarily used by Human Resources (HR) teams, Recruiters, Hiring Managers, and Executive Leadership.
- Functional Requirements: What functionality is required? (e.g., train a model, serve predictions, monitor performance). Critical features required: Data Ingestion and Preprocessing, Toxicity Classification Model, Dashboard for Insights and Results Visualization, Automated Feedback and Suggestions, User Permissions and Role-based Access Control, Feedback Loop and Model Improvement, Automated Reporting and Alerts, Compliance and Legal Checks, Integration with External Systems, Model Management and Experiment Tracking.
- Non-Functional Requirements: What constraints need to be met? (e.g., scalability, latency, security, data privacy). Key NFRs: Performance, Accuracy and Precision, Security and Data Privacy, Usability, Reliability and Availability, Maintainability and Flexibility, Integration and Compatibility, Compliance and Ethical Considerations, Cost and Resource Constraints.
High-Level Architecture
This is a blueprint of the system. It should describe the components, how they interact, and the overall data flow.
Components in a Typical ML Solution:
Data Source:
- Where does the data come from? (e.g., relational databases, APIs, data lakes)
- What are the data formats? (e.g., CSV, JSON, Parquet)
Data Pipeline:
- How will data be ingested and preprocessed?
- ETL/ELT processes, data validation, cleaning, and transformation steps.
Feature Engineering:
- Describe how features are derived from raw data.
- Tools: Pandas, Spark, or Feature Stores (e.g., Tecton, Feast).
Model Training:
- What algorithms or models will you use? (e.g., regression, deep learning, decision trees)
- How will hyperparameter tuning be managed? (e.g., GridSearchCV, Hyperopt)
- Tools: Scikit-learn, TensorFlow, PyTorch.
Model Serving and API:
- How will the trained model be deployed and served? (e.g., REST API, streaming service)
- Tools: Flask, FastAPI, TensorFlow Serving, Kubernetes, Docker.
Monitoring and Logging:
- How will you track model performance (accuracy, latency) in production?
- Tools: Prometheus, Grafana, ELK stack (Elasticsearch, Logstash, Kibana).
Model Retraining:
- What triggers model retraining? (e.g., data drift, model performance degradation)
- How is retraining automated? (e.g., via Airflow, Jenkins)
Detailed Component Design
After defining the architecture, dive into the details of each component.
Data Pipeline:
- Ingestion: Define how data enters the system, whether batch or real-time.
- Transformation: Specify the transformations and features that will be generated.
- Storage: Decide where to store raw and processed data (e.g., S3, HDFS).
Model Training:
- Algorithm Selection: Document why a specific algorithm is chosen based on the problem.
- Training Infrastructure: Should the model be trained on a local machine, distributed cluster (e.g., Spark), or GPU/TPU?
- Versioning: How will models be versioned? Tools like MLflow or DVC can be used to track model versions.
Model Serving:
- Containerization: Containerize the model (using Docker) for consistency across environments.
- Scaling: Use Kubernetes or AWS Lambda for scaling the model inference service.
- Latency Optimization: If latency is critical, consider optimizing models (e.g., model quantization, ONNX for model optimization).
Monitoring:
- Metrics to Monitor: Accuracy, F1-score, latency, prediction distribution, data drift.
- Alerting: Set thresholds for alerts when performance degrades or anomalies occur.
Technology Stack
Choose the tools and technologies best suited for your solution.
Data Ingestion:
- Batch: Apache Airflow, Luigi
- Real-time: Kafka, AWS Kinesis, Google Pub/Sub
Data Processing:
- Pandas (for smaller datasets)
- PySpark, Dask (for large-scale distributed processing)
Model Training:
- Scikit-learn, XGBoost (for classical ML)
- TensorFlow, PyTorch (for deep learning)
Deployment:
- FastAPI, Flask (for building APIs)
- Docker (for containerization)
- Kubernetes (for orchestration and scaling)
Monitoring:
- Prometheus (for metrics)
- Grafana (for dashboards)
- ELK Stack (for logs)
Security and Compliance
- Data Security: Ensure that sensitive data is encrypted both at rest and in transit. Use encryption libraries (e.g., AWS KMS, GCP KMS).
- Authentication and Authorization: Secure APIs with OAuth2, JWT, or other authentication mechanisms.
- Compliance: Ensure that your solution complies with regulations like GDPR (data privacy), HIPAA (healthcare), or industry-specific standards.
Deployment Strategy
Define the steps and environment for deploying the solution.
- Environment: Development, staging, and production environments.
- CI/CD Pipeline: Automate testing, integration, and deployment using Jenkins, GitHub Actions, or GitLab CI.
- Blue-Green or Canary Deployment: For safe model updates, use blue-green or canary deployments to reduce downtime and mitigate risks.
Risk Assessment and Mitigation
Identify potential risks and the mitigation plan for each.
- Data Drift: Solution: Set up a data drift detection mechanism.
- Model Degradation: Solution: Automate retraining and periodic evaluation of models.
- Scaling Bottlenecks: Solution: Design for horizontal scaling and stress test the infrastructure.
Future Enhancements and Roadmap
- Feature Enhancements: Adding new features or integrating with other systems (e.g., using additional data sources, implementing new models).
- Automated Retraining: Expanding automated model retraining and performance monitoring.
- Advanced Monitoring: Implementing more advanced monitoring such as real-time feedback loops or anomaly detection in predictions.
Automating the ML Model Cycle
Create a New Repository on DagsHub
- Go to DagsHub and log in to your account.
- Click on the “New Repository” button.
- Fill in your repository details (name, description, etc.).
- Initialize the repository with a README file if you want.
- Click “Create Repository”.
Load the Dataset in a Notebook
- Clone the repository to your local machine or directly on DagsHub if you’re using its hosted notebooks.
- Upload your dataset (
toxic_df_sample.csv
) to the repository.
Code to load the dataset in a Jupyter Notebook:
# Install the DagsHub python client
!pip install -q dagshub
!pip install mlflow
from dagshub.data_engine import datasources
ds_list = datasources.get_datasources('oleh.dubetcky/toxic')
ds = ds_list[0]
print(ds)
import pandas as pd
df = pd.read_csv(ds.head().dataframe.dagshub_download_url[0])
# Preview the data
df.head()
# Filter necessary columns
df = df[['is_toxic', 'jobdescription_en']]
# Drop missing values
df.dropna(inplace=True)
Set Up TinyBERT for Toxic Job Classification
Using TinyBERT for toxic job description classification is a great idea, as TinyBERT is a compact and efficient transformer model specifically designed to reduce computational complexity while maintaining high performance.
# Selecting the relevant columns
texts = df['jobdescription_en'].values
labels = df['is_toxic'].values
from transformers import BertTokenizer, BertForSequenceClassification
from torch.utils.data import DataLoader, Dataset
import torch
# Load TinyBERT tokenizer and model
tokenizer = BertTokenizer.from_pretrained('huawei-noah/TinyBERT_General_4L_312D')
model = BertForSequenceClassification.from_pretrained('huawei-noah/TinyBERT_General_4L_312D', num_labels=2)
class ToxicDataset(Dataset):
def __init__(self, texts, labels, tokenizer, max_len):
self.texts = texts
self.labels = labels
self.tokenizer = tokenizer
self.max_len = max_len
def __len__(self):
return len(self.texts)
def __getitem__(self, index):
text = self.texts[index]
label = self.labels[index]
# Tokenize and encode the text, returning input IDs and attention mask
encoding = self.tokenizer.encode_plus(
text,
add_special_tokens=True,
max_length=self.max_len,
return_token_type_ids=False,
padding='max_length',
truncation=True,
return_attention_mask=True,
return_tensors='pt',
)
# Return a dictionary of the processed data with labels as tensors
return {
'input_ids': encoding['input_ids'].flatten(),
'attention_mask': encoding['attention_mask'].flatten(),
'label': torch.tensor(label, dtype=torch.long)
}
MAX_LEN = 128
dataset = ToxicDataset(texts, labels, tokenizer, MAX_LEN)
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader
# Split the data into training and validation sets
train_texts, val_texts, train_labels, val_labels = train_test_split(texts, labels, test_size=0.5, random_state=42)
# Create the datasets
train_dataset = ToxicDataset(train_texts, train_labels, tokenizer, MAX_LEN)
eval_dataset = ToxicDataset(val_texts, val_labels, tokenizer, MAX_LEN)
# Create the dataloaders
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
eval_loader = DataLoader(eval_dataset, batch_size=16)
Track Experiments with MLflow
import dagshub
dagshub.init(repo_owner='oleh.dubetcky', repo_name='toxic', mlflow=True)
# set mlflow tracking uri
import mlflow
mlflow.set_tracking_uri("https://dagshub.com/oleh.dubetcky/toxic.mlflow")
from transformers import AutoModelForSequenceClassification, AutoTokenizer, AdamW
from torch.optim import lr_scheduler
import torch
# Initialize model and tokenizer
model = AutoModelForSequenceClassification.from_pretrained('huawei-noah/TinyBERT_General_4L_312D', num_labels=2)
tokenizer = AutoTokenizer.from_pretrained('huawei-noah/TinyBERT_General_4L_312D')
# Define paths to save the model and tokenizer
model_dir = './toxic_job_description_model'
tokenizer_dir = './toxic_job_description_tokenizer'
# Optimizer and scheduler
optimizer = AdamW(model.parameters(), lr=2e-5, correct_bias=False)
scheduler = lr_scheduler.StepLR(optimizer, step_size=2, gamma=0.1)
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = model.to(device)
# Train epoch function with logging
def train_epoch(model, data_loader, optimizer, device):
model.train()
total_loss = 0
for batch in data_loader:
optimizer.zero_grad()
input_ids = batch['input_ids'].to(device)
attention_mask = batch['attention_mask'].to(device)
labels = batch['label'].to(device)
outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
loss = outputs.loss
total_loss += loss.item()
loss.backward()
optimizer.step()
return total_loss / len(data_loader)
# MLflow setup
experiment_name = "toxic_job_description_experiment"
mlflow.set_experiment(experiment_name)
# Main training loop with MLflow logging
EPOCHS = 5
with mlflow.start_run(run_name="tinybert_toxic_job_description"):
# Log model configuration and hyperparameters
mlflow.log_param("learning_rate", 2e-5)
mlflow.log_param("epochs", EPOCHS)
mlflow.log_param("step_size", 2)
mlflow.log_param("gamma", 0.1)
# Optional: Log any additional metrics, parameters, etc.
mlflow.log_param("model_type", "TinyBERT")
mlflow.log_param("dataset", "toxic_job_description")
for epoch in range(EPOCHS):
train_loss = train_epoch(model, train_loader, optimizer, device)
scheduler.step() # Update learning rate
current_lr = scheduler.get_last_lr()[0]
# Log training loss and learning rate
mlflow.log_metric("train_loss", train_loss, step=epoch)
mlflow.log_metric("learning_rate", current_lr, step=epoch)
print(f'Epoch {epoch + 1}, Loss: {train_loss}, Learning Rate: {current_lr}')
# Save and log final model and tokenizer
model.save_pretrained(model_dir)
tokenizer.save_pretrained(tokenizer_dir)
mlflow.log_artifacts(model_dir, artifact_path="final_model")
mlflow.log_artifacts(tokenizer_dir, artifact_path="final_tokenizer")
# Register the model
run = mlflow.active_run()
model_uri = f"runs:/{run.info.run_id}/final_model"
mlflow.register_model(model_uri, name="toxic_job_description_model")
print("Model registered in MLflow.")
def eval_model(model, data_loader, device):
model.eval()
correct_predictions = 0
total_predictions = 0
with torch.no_grad():
for batch in data_loader:
input_ids = batch['input_ids'].to(device)
attention_mask = batch['attention_mask'].to(device)
labels = batch['label'].to(device)
outputs = model(input_ids, attention_mask=attention_mask)
_, preds = torch.max(outputs.logits, dim=1)
correct_predictions += torch.sum(preds == labels)
total_predictions += len(labels)
return correct_predictions.double() / total_predictions
accuracy = eval_model(model, eval_loader, device)
print(f'Validation Accuracy: {accuracy:.4f}')
Model serving with Web Application
To deploy your TinyBERT model using Streamlit, you can create an interactive web application that allows users to input data and see predictions in real time.
Install Streamlit
If you haven’t already, install Streamlit:
pip install streamlit
Create a Streamlit App
Create a new Python file, e.g., app.py
, and write the following code:
import streamlit as st
import torch
from transformers import BertForSequenceClassification, BertTokenizer
# Load the saved model and tokenizer
model = BertForSequenceClassification.from_pretrained('./toxic_job_description_model')
tokenizer = BertTokenizer.from_pretrained('./toxic_job_description_tokenizer')
# Function to make predictions
def predict(text):
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=128)
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
probabilities = torch.nn.functional.softmax(logits, dim=-1)
predicted_class = torch.argmax(probabilities).item()
return predicted_class, probabilities[0][predicted_class].item()
# Streamlit app layout
st.title("Toxic Job Description Classification")
st.write("Enter a job description to check if it's toxic or not.")
# Text input for job description
job_description = st.text_area("Job Description:")
if st.button("Predict"):
if job_description:
# Make prediction
predicted_class, confidence = predict(job_description)
label = "Toxic" if predicted_class == 1 else "Non-Toxic"
st.success(f"Prediction: {label} (Confidence: {confidence:.2f})")
else:
st.error("Please enter a job description.")
Run the Streamlit App
Open a terminal and run the Streamlit app with the following command:
streamlit run app.py
This will start the Streamlit server, and you should see a URL (usually http://localhost:8501
) where you can access your app in a web browser.
Deploying the App
To deploy your Streamlit app, you can use platforms like Streamlit Sharing, Heroku, or DagsHub (if you have a specific setup for Streamlit).
Deploying on Streamlit Sharing
- Create a GitHub repository for your project.
- Push your
app.py
and any other necessary files (like requirements.txt). - Go to Streamlit Sharing, sign in with your GitHub account, and deploy your app from your repository.
Deploying on DagsHub
You can also host the Streamlit app on DagsHub:
- Push your app code to your DagsHub repository.
- You might need to set up a DagsHub Streamlit deployment configuration in your repository settings.
We can see deployed app on https://toxicjob.streamlit.app/
If you liked the article, you can support the author by clapping below 👏🏻 Thanks for reading!