Active Learning Pipeline¶
Data Engine enables companies of any size, or even individual developers, to implement an active learning pipeline without the need for large amounts of resources.
This documentation is based on the active learning pipeline created in the Tooth Fairy repo notebook.
This pipeline will use the following tools, available with a DagsHub account:
- Data Engine - enables and orchestrates the entire active learning process
- MLflow - used to log and track experiments and models
- Label Studio - annotation tool
Each DagsHub repo comes with the above tools.
This documentation assumes you've already connected a Datasource.
0. Setup¶
The most general setup we need to perform is authentication to DagsHub and setting up the MLflow tracking URI. This happens in step 0 of the notebook.
import dagshub
import mlflow
DAGSHUB_TOKEN = dagshub.auth.get_token()
DAGSHUB_USER = "yonomitt"
DAGSHUB_REPO = "ToothFairy"
DATASOURCE_PATH = "s3://tooth-dataset/data"
DAGSHUB_FULL_REPO = DAGSHUB_USER + "/" + DAGSHUB_REPO
MLFLOW_TRACKING_URI = f"https://dagshub.com/{DAGSHUB_USER}/{DAGSHUB_REPO}.mlflow"
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
os.environ["MLFLOW_TRACKING_USERNAME"] = DAGSHUB_USER
os.environ["MLFLOW_TRACKING_PASSWORD"] = DAGSHUB_TOKEN
By setting the tracking URI and the environment variables above, the MLflow client will be configured to communicate with the repo's MLflow server.
1. Getting or creating the Datasource¶
Since Data Engine works primarily with Datasource
objects, we need to get or create one.
def get_or_create_datasource(name):
try:
# get the Datasource named `name`
ds = datasources.get_datasource(repo=DAGSHUB_FULL_REPO, name=name)
except:
# the Datasource named `name` doesnt exist, so create it
ds = datasources.create(repo=DAGSHUB_FULL_REPO, name=name, path=DATASOURCE_PATH)
return ds
ds = get_or_create_datasource('<datasource_name>')
This handy function that will try to get a Data Engine datasource under a given name
. If it does not exist, then it will create one. This function is safe to call multiple times.
2. Enriching the Datasource¶
The first time through our data, we want to create our train/validation/test splits. We can do this directly in our Datasource's metadata, which will keep our data points assigned to the same splits each time we run.
We use the following function, which operates on a pandas Dataframe to create the splits and assign them to a new column:
from sklearn.model_selection import train_test_split
def create_splits(df, train = 0.6, valid=0.3, test=0.1):
total = train + valid + test
train /= total
valid /= total
test /= total
# create the training DataFrame and the rest
train_df, rest_df = train_test_split(df, test_size=valid + test)
total = valid + test
valid /= total
test /= total
# create the validation and test DataFrames
valid_df, test_df = train_test_split(rest_df, test_size=test)
train_df['split'] = 'train'
valid_df['split'] = 'valid'
test_df['split'] = 'test'
# combine the three splits DataFrames into one and return
return pd.concat([train_df, valid_df, test_df], ignore_index=True)
Since Data Engine Datasources can be converted to pandas DataFrames, we can run the following to create our splits:
# get all data points from our Datasource and convert to a pandas DataFrame
md = ds.all().dataframe
# create the train/valid/test splits for our data
md = create_splits(md, train=0.6, valid=0.3, test=0.1)
The next step of the enrichment process is to add any current annotations we have. For this we use the panads.DataFrame.apply
function to apply our conversion function to each row of the DataFrame that was created from our Datasource:
def create_metadata(row):
# convert annotation to the Data Engine format and assign to the `annotation` column
row['annotation'] = ...
# apply the `create_metadata` function to each row of our metadata DataFrame
enriched_md = md.apply(create_metadata, axis=1)
Converting Annotations to Data Engine format
The Tooth Fairy repo has examples of functions that convert from COCO to the Data Engine format and from YOLO to the Data Engine format. You can use these as a basis for writing custom coverters for your data.
Finally, we need to upload our metadata to DagsHub:
# set the batch size for uploads
dagshub.common.config.dataengine_metadata_upload_batch_size = 50
# upload the enriched metadata to DagsHub
ds.upload_metadata_from_dataframe(enriched_md, path_column="path")
3. Filtering the Datasource¶
After creating our splits and adding any annotations we have, we can now filter our Datasource based on these metadata.
# filter Datasource to only include annotated data
labeled = ds[ds['annotation'].is_not_null()]
# filter Datasource into train and valid splits
train = labeled[labeled['split'] == 'train']
valid = labeled[labeled['split'] == 'valid']
4. Training a model¶
Using the training and validation splits, we can create DataLoaders for them and use them directly in training. While the input data can automatically be converted to tensors, we need to write a conversion function to handle the annotations.
def annotation_to_tensor(annotation_file: str) -> torch.Tensor:
# logic to read annotation file and convert to `torch.Tensor`
...
With this in place, we can then create our DataLoaders from our train
and valid
query results.
# load our model
model = ...
# convert the file hash into a full file path that stores the annotations
# this makes it easier to read the file in the `annotation_to_tensor` function
train = train.all().get_blob_fields('annotation')
valid = valid.all().get_blob_fields('annotation')
train_loader = train.as_ml_dataloader(flavor='torch',
metadata_columns=['annotation'],
# `tensorizers` need one more value than `metadata_columns`
# because the 'path' column for our input is automatically included
tensorizers=['image', annotation_to_tensor])
valid_loader = train.as_ml_dataloader(flavor='torch',
metadata_columns=['annotation'],
tensorizers=['image', annotation_to_tensor])
best_valid_loss = 1_000_000.
# start an mlflow run context
with mlflow.start_run():
for epoch in range(EPOCHS):
# ensure model is in training mode
model.train(True)
# run a single epoch using a function to do so
train_loss = train_one_epoch(model, train_loader)
# put our model into evaluation mode
model.eval()
valid_loss = calculate_validation_loss(model, valid_loader)
# log the training metrics to the MLflow server
mlflow.log_metrics({'train_loss': train_loss, 'valid_loss', valid_loss}, step=epoch)
# save the model if it has the lowest validation loss
if valid_loss < best_valid_loss:
save_model(model)
best_valid_loss = valid_loss
# log the best model weights to the MLflow server
mlflow.log_artifact('path/to/best/model.pt')
This example training loop shows the most important aspects of using the Data Engine with training a model. Functions such as train_one_epoch
and calculate_validation_loss
will, potentially, be custom functions for your project.
5. Filtering the Datasource for unlabeled data¶
After a model has been trained, we need to use that model to run predictions on all of the unlabeled data. These predictions will be used in the next step to determine which data needs to be labeled for the next active learning cycle.
unlabeled = ds[ds['annotation'].is_null()]
6. Running inference on unlabeled data¶
When running inference, we can use the as_ml_dataloader
function again to turn our unlabeled data into tensors we can feed directly into our model.
dataloader = unlabled.all().as_ml_dataloader(flavor='torch')
After this we can run each datapoint through the model to get a prediction and an overall confidence score.
# get a DataFrame for the metadata of the unlabeled data points
md = unlabled.all().dataframe
results = {}
# iterate over the dataloader and the 'path' column of the metadata
for data, path in zip(dataloader, md['path']):
# run the model
out = model(data)
# convert the output to Data Engine format and a confidence score
prediction, score = convert_to_de(out)
# store the predictions and scores to the `results` dictionary
results[path] = (prediction, score)
Converting to Data Engine format
The Tooth Fairy repo has examples of functions that convert from COCO to the Data Engine format and from YOLO to the Data Engine format. You can use these as a basis for writing custom coverters for your data.
During active learning, you need to calculate a single score for each data point. This score should be an indication of how difficult the data point was for the model to predict.
How you calculate a score for each data point will be project dependent. There are simple calculations such as:
- Lowest confidence - the score is the lowest confidence of all detected objects
- Average confidence - average of all confidences of detected objects
- Minimizing confidence delta - difference between confidences for the top two labels of an object
But there are much more complex algorithms such as:
- Maximizing entropy - entropy is a measure of the confidence across all categories
- Predicting loss - predicting the loss of the sample, were the ground truth known
7. Enriching the Datasource with predictions¶
Once we have our results, we can update our metadata to include the predictions and the scores we just calculated.
def pred_to_metadata(row, results):
path = row['path']
if path not in results:
return row
prediction, score = results[path]
row['prediction'] = prediction
row['pred_score'] = score
return row
enriched_unlabeled = md.apply(lambda x: pred_to_metadata(x, results), axis=1)
Then we upload the newly enriched metadata to DagsHub
dagshub.common.config.dataengine_metadata_upload_batch_size = 50
ds.upload_metadata_from_dataframe(enriched_unlabeled, path_column="path")
8. Sorting Datasource by score¶
Using the score we just added to the metadata, we want to choose MAX_IMAGES
number of samples for our annotators, where MAX_IMAGES
depends on:
- number of image to annotate per active learning cycle desired
- speed of annotators
- availability of annotators
We may also want to threshold our selected images by a MAX_SCORE
. The model may not learn anything new from images it was already very confident about.
Let's start by querying our Datasource for all unlabeled, predicted data points:
unlabeled = ds[ds['annotation'].is_null()]
predicted = unlabeled[unlabeled['pred_score'].is_not_null()]
Next we determine the threshold score that will give us at most MAX_IMAGES
number of images.
# get all scores as a list
scores = predicted.all().dataframe['pred_score'].tolist()
# sort the scores
scores = sorted(scores)
# remove all scores above our maximum allowed score
scores = [score for score in scores if score <= MAX_SCORE]
# determine which score is our threshold
threshold_score = scores[:MAX_IMAGES][-1]
Using this threshold_score
, we can filter our Datasource further into data points we want to label:
to_label = predicted[predicted['pred_score'] <= threshold_score]
9. Creating a Label Studio project¶
Calling annotate
on our filtered Datasource will help us create a Label Studio project with the datapoints in the Datasource.
This function returns a link to create the Label Studio project. You will need to:
- Provide a name for the Label Studio project (which you need to note for later)
- Decide whether or not to copy the project config from another Label Studio project. If you do not, or cannot, do this, you will have to manually configure the project later by providing the annotation type and the correct label names.
url = to_label.annotate()
For more information, check out our documentation on annotating data.
10. Sending predictions to Label Studio¶
We can send our model's predictions for our data points to Label Studio. This can help speed up your annotators, since they will have a starting point they can make corrections to.
To do so, we need to use the Label Studio SDK to get an instance of a Label Studio client:
from label_studio_sdk import Client
def label_studio_client():
# build the appropriate URL for the Label Studio API endpoint
url = f'https://{DAGSHUB_USER}:{dagshub.auth.get_token()}@dagshub.com/{DAGSHUB_USER}/{DAGSHUB_REPO}/annotations/de'
# create a Label Studio client
ls = Client(url=url, api_key=dagshub.auth.get_token())
return ls
ls = label_studio_client()
Next, we need to get access to the project we just created on Label Studio. To do this, we can loop through all projects and stop when we find the one with the right name:
for proj in ls.list_projects():
if proj.params['title'] == LS_PROJ_NAME_FROM_STEP_9:
break
Now that our project is available in the proj
variable, we need to get all available Tasks. A Task in Label Studio is a data point to be labeled along with some metadata about it.
We can use this helper function to get all tasks for the project:
import requests
def get_tasks(ls, proj):
# build the API endpoing to get the tasks
url = ls.get_url('/api/tasks')
# set the project ID as a query parameter
query = {'project': proj.id}
# execute the request
res = requests.request(method='GET', url=url, params=query)
if res.status_code == 200:
return res.json()['tasks']
return []
tasks = get_tasks(ls, proj)
The Tasks include an ID, which is required when sending predictions back to the Label Studio server. Since our Data Engine metadata includes the URL for each data point, we can create a map from URL to Task ID:
url_to_taskid = {task['data']['image']: task['id'] for task in tasks}
Since our prediction
metadata column is JSON formated data stored as a binary BLOB, we need to make sure we have the data loaded into memory:
predictions = to_label.all()
predictions.get_blob_fields('prediction', load_into_memory=True)
Our Data Engine prediction and annotation format is compatible with the Label Studio format. However, to send the Task predictions the Label Studio API needs two more pieces of information:
- a prediction score
- the Task ID
Through good planning, we have all of that information on hand. To make things simpler, let's put it into a new list called ls_preds
:
# list of predictions to be sent to Label Studio
ls_preds = []
# loop through all our data points that are to be labeled
for pred in predictions:
# convert the binary BLOB to JSON
pred_json = json.loads(pred['prediction'].decode())
# get the URL and prediction score from the metadata
url = pred['dagshub_download_url']
score = pred['pred_score']
# look up the Task ID for this data point
task_id = url_to_taskid[url]
# append a dictionary of the info Label Studio needs
ls_preds.append({
'result': pred_json['prediction'][0]['result'],
'score': score,
'task': task_id
})
All that's left to do is to upload our Task predictions. We use the backoff
Python module to retry and failed upload attempts, with an exponential backoff schedule.
import backoff
@backoff.on_exception(backoff.expo, ConnectionError, max_tries=8)
def create_ls_prediction(proj, pred):
# send the prediction to the Label Studio API through the SDK
proj.create_prediction(pred['task'], pred['result'], pred['score'])
# loop through all task predictions
for ls_pred in ls_preds:
try:
# call function to create predicitions with exponential backoff
create_ls_prediction(proj, ls_pred)
except:
print(f'ERROR: prediction for task ({ls_pred['task']}) was not successfully created')
11. Correcting and saving Label Studio annotations¶
At this point, you can open your Label Studio project in your favorite browser. The URL for this project can be created programatically:
proj_id = proj.get_params().params['id']
url = f'https://dagshub.com/{DAGSHUB_USER}/{DAGSHUB_REPO}/annotations/de/dagshub/projects/{proj_id}/data'
Once you go to that URL, you will be presented with the Label Studio UI. The data points selected for annotation will be available in the Label Studio project and the predictions should be included.
You or your annotator can either modify the predictions to make them more accurate or remove them and create annotations from scrath. When you click the Save button any predictions and changes made will automatically be saved to the 'annotation'
column of your Datasource's metadata. This makes it available for training in the next active learning cycle.
For more information on using Label Studio with Data Engine, check out our documentation on annotating data.
12. GOTO step 3¶
At this point, you're ready to train a new model, which means you will start a new active learning cycle.