Federated classic CTR model training on Criteo¶
In this tutorial, we show you how to develop a horizontal federated recommendation model. We use the third-party library torch-rechub to call some classic recommendation models such as FM, DeepFM, etc., and use it to build a federated task in FATE. In terms of the dataset, we use the Criteo dataset which has been sampled and preprocessed, with a total of 50K data. For convenience, in this tutorial, all clients use the same dataset.
Install Torch-rechub¶
We will use torch-rechub from:https://github.com/datawhalechina/torch-rechub. To run this tutorial, you need to install the latest-version:
- git clone https://github.com/datawhalechina/torch-rechub.git
- cd torch-rechub
- python setup.py install
The Criteo¶
The Criteo dataset is an online advertising dataset released by Criteo Labs. It contains millions of records of click feedback for displayed ads, which can be used as a benchmark for click-through rate (CTR) prediction. The dataset has 40 features, with the first column being the label, where a value of 1 indicates that the ad was clicked and a value of 0 indicates that the ad was not clicked. The other features include 13 dense features and 26 sparse features.
- Dowload sampled data used in this tutorial here:¶
https://webank-ai-1251170195.cos.ap-guangzhou.myqcloud.com/fate/examples/data/criteo.csv
And then put the csv under /examples/data.
- The origin source of the criteo dataset is: https://ailab.criteo.com/ressources/
Construct dataset class¶
Here we will construct a Criteo dataset that read the csv. At the same time, it will process the dataset and return information about dense&sparse features.
from pipeline.component.nn import save_to_fate
%%save_to_fate dataset criteo_dataset.py
from sklearn.preprocessing import MinMaxScaler, LabelEncoder
from tqdm import tqdm
import pandas as pd
import numpy as np
from federatedml.nn.dataset.base import Dataset
from torch_rechub.basic.features import DenseFeature, SparseFeature
class CriteoDataset(Dataset):
def __init__(self):
# super().__init__()
super(CriteoDataset, self).__init__()
self.x = None
self.y = None
self.sample_ids= None
# load dataset and process
def load(self, data_path):
# convert numeric features
def convert_numeric_feature(val):
v = int(val)
if v > 2:
return int(np.log(v)**2)
else:
return v - 2
# load data
if data_path.endswith(".gz"): #if the raw_data is gz file:
data = pd.read_csv(data_path, compression="gzip")
else:
data = pd.read_csv(data_path)
print("data load finished")
# print(data.info())
if 'sid' in data.columns:
self.sample_ids = list(data.sid)
# feature_process
dense_features = [f for f in data.columns.tolist() if f[0] == "I"]
sparse_features = [f for f in data.columns.tolist() if f[0] == "C"]
data[sparse_features] = data[sparse_features].fillna('0')
data[dense_features] = data[dense_features].fillna(0)
for feat in tqdm(dense_features): #discretize dense feature and as new sparse feature
sparse_features.append(feat + "_cat")
data[feat + "_cat"] = data[feat].apply(lambda x: convert_numeric_feature(x))
sca = MinMaxScaler() #scaler dense feature
data[dense_features] = sca.fit_transform(data[dense_features])
for feat in tqdm(sparse_features): #encode sparse feature
lbe = LabelEncoder()
data[feat] = lbe.fit_transform(data[feat])
def get_dense_feat(name):
return DenseFeature(name), {'name': name}
dense_feas = []
dense_feas_dict = []
for feature_name in dense_features:
a,b = get_dense_feat(feature_name)
dense_feas.append(a)
dense_feas_dict.append(b)
def get_sparse_feat(name, vocab_size,embed_dim):
return SparseFeature(name, vocab_size,embed_dim), {'name': name, 'vocab_size': vocab_size,'embed_dim':embed_dim}
sparse_feas=[]
sparse_feas_dict=[]
for feature_name in sparse_features:
a,b = get_sparse_feat(name=feature_name, vocab_size=data[feature_name].nunique(), embed_dim=16)
sparse_feas.append(a)
sparse_feas_dict.append(b)
ffm_linear_feas = []
ffm_linear_feas_dict = []
def get_ffm_linear_feat(name, vocab_size,embed_dim):
return SparseFeature(name, vocab_size,embed_dim), {'name': name, 'vocab_size': vocab_size,'embed_dim':embed_dim}
for feature in sparse_feas:
a,b = get_ffm_linear_feat(name = feature.name, vocab_size=feature.vocab_size, embed_dim=1)
ffm_linear_feas.append(a)
ffm_linear_feas_dict.append(b)
ffm_cross_feas = []
ffm_cross_feas_dict = []
def get_ffm_cross_feat(name, vocab_size,embed_dim):
return SparseFeature(name, vocab_size,embed_dim), {'name': name, 'vocab_size': vocab_size,'embed_dim':embed_dim}
for feature in sparse_feas:
a,b = get_ffm_cross_feat(name = feature.name, vocab_size=feature.vocab_size*len(sparse_feas), embed_dim=10)
ffm_cross_feas.append(a)
ffm_cross_feas_dict.append(b)
y = data["label"]
del data["label"]
x = data
# ncq:
self.x = x
self.y = y
return dense_feas, dense_feas_dict,sparse_feas,sparse_feas_dict, ffm_linear_feas,ffm_linear_feas_dict, ffm_cross_feas, ffm_cross_feas_dict
def __getitem__(self, index):
return {k: v[index] for k, v in self.x.items()}, self.y[index].astype(np.float32)
def __len__(self):
return len(self.y)
def get_sample_ids(self,):
return self.sample_ids
CTR Models¶
Here based on Torch-rechub, we develop four kinds of CTR models: FM, FFM, DeepFM and DeepFFM
%%save_to_fate model fm_model.py
import torch
from torch_rechub.basic.layers import FM,LR,EmbeddingLayer
from torch_rechub.basic.features import DenseFeature, SparseFeature
class FMModel(torch.nn.Module):
'''
A standard FM models
'''
def __init__(self, dense_feas_dict, sparse_feas_dict):
super(FMModel, self).__init__()
dense_features = []
def recover_dense_feat(dict_):
return DenseFeature(name=dict_['name'])
for i in dense_feas_dict:
dense_features.append(recover_dense_feat(i))
self.dense_features = dense_features
sparse_features = []
def recover_sparse_feat(dict_):
return SparseFeature(dict_['name'], dict_['vocab_size'],dict_['embed_dim'])
for i in sparse_feas_dict:
sparse_features.append(recover_sparse_feat(i))
self.sparse_features = sparse_features
self.fm_features = self.sparse_features
self.fm_dims = sum([fea.embed_dim for fea in self.fm_features])
self.linear = LR(self.fm_dims) # 1-odrder interaction
self.fm = FM(reduce_sum=True) # 2-odrder interaction
self.embedding = EmbeddingLayer(self.fm_features)
def forward(self, x):
input_fm = self.embedding(x, self.fm_features, squeeze_dim=False) #[batch_size, num_fields, embed_dim]input_fm: torch.Size([100, 39, 16])
# print('input_fm:',input_fm.shape) # input_fm: torch.Size([100, 39, 16]) (batch_size, num_features, embed_dim)
y_linear = self.linear(input_fm.flatten(start_dim=1))
y_fm = self.fm(input_fm)
# print('y_fm.shape:',y_fm.shape) # y_fm.shape: torch.Size([100, 1])
y = y_linear + y_fm
return torch.sigmoid(y.squeeze(1))
%%save_to_fate model deepfm_model.py
import torch
from torch_rechub.basic.layers import FM,LR,EmbeddingLayer,MLP
from torch_rechub.basic.features import DenseFeature, SparseFeature
class DeepFMModel(torch.nn.Module):
"""Deep Factorization Machine Model
Args:
deep_features (list): the list of `Feature Class`, training by the deep part module.
fm_features (list): the list of `Feature Class`, training by the fm part module.
mlp_params (dict): the params of the last MLP module, keys include:`{"dims":list, "activation":str, "dropout":float, "output_layer":bool`}
"""
def __init__(self, dense_feas_dict, sparse_feas_dict, mlp_params):
super(DeepFMModel, self).__init__()
dense_features = []
def recover_dense_feat(dict_):
return DenseFeature(name=dict_['name'])
for i in dense_feas_dict:
dense_features.append(recover_dense_feat(i))
self.dense_features = dense_features
sparse_features = []
def recover_sparse_feat(dict_):
return SparseFeature(dict_['name'], dict_['vocab_size'],dict_['embed_dim'])
for i in sparse_feas_dict:
sparse_features.append(recover_sparse_feat(i))
self.deep_features = dense_features
self.fm_features = sparse_features
self.deep_dims = sum([fea.embed_dim for fea in dense_features])
self.fm_dims = sum([fea.embed_dim for fea in sparse_features])
self.linear = LR(self.fm_dims) # 1-odrder interaction
self.fm = FM(reduce_sum=True) # 2-odrder interaction
self.embedding = EmbeddingLayer(dense_features + sparse_features)
self.mlp = MLP(self.deep_dims, **mlp_params)
def forward(self, x):
input_deep = self.embedding(x, self.deep_features, squeeze_dim=True) #[batch_size, deep_dims]
input_fm = self.embedding(x, self.fm_features, squeeze_dim=False) #[batch_size, num_fields, embed_dim]
y_linear = self.linear(input_fm.flatten(start_dim=1))
y_fm = self.fm(input_fm)
y_deep = self.mlp(input_deep) #[batch_size, 1]
y = y_linear + y_fm + y_deep
return torch.sigmoid(y.squeeze(1))
%%save_to_fate model ffm_model.py
import torch
from torch_rechub.basic.layers import LR,EmbeddingLayer,FFM,FM
import torch
from torch_rechub.basic.features import DenseFeature, SparseFeature
class FFMModel(torch.nn.Module):
def __init__(self, liner_feas_dict, cross_feas_dict):
super(FFMModel, self).__init__()
linear_features = []
def recover_ffm_linear_feat(dict_):
return SparseFeature(dict_['name'], dict_['vocab_size'],dict_['embed_dim'])
for i in liner_feas_dict:
linear_features.append(recover_ffm_linear_feat(i))
cross_features = []
def recover_ffm_cross_feat(dict_):
return SparseFeature(dict_['name'], dict_['vocab_size'],dict_['embed_dim'])
for i in cross_feas_dict:
cross_features.append(recover_ffm_cross_feat(i))
self.linear_features = linear_features
self.cross_features = cross_features
self.num_fields = len(cross_features)
self.num_field_cross = self.num_fields * (self.num_fields - 1) // 2
# print('num_fields:',self.num_fields) #39
self.ffm = FFM(num_fields=self.num_fields, reduce_sum=False)
self.linear_embedding = EmbeddingLayer(linear_features)
self.ffm_embedding = EmbeddingLayer(cross_features)
self.b =torch.nn.Parameter(torch.zeros(1))
fields_offset = torch.arange(0, self.num_fields, dtype=torch.long)
self.register_buffer('fields_offset', fields_offset)
self.fm = FM(reduce_sum=True)
def forward(self, x):
y_linear = self.linear_embedding(x, self.linear_features, squeeze_dim=True).sum(1, keepdim=True) #[batch_size, 1]
# output shape [batch_size, num_field, num_field, emb_dim]
x_ffm = {fea.name: x[fea.name].unsqueeze(1) * self.num_fields + self.fields_offset for fea in self.cross_features}
input_ffm = self.ffm_embedding(x_ffm, self.cross_features, squeeze_dim=False)
# print('input_ffm.shape:',input_ffm.shape) # torch.Size([100, 39, 39, 10])
em = self.ffm(input_ffm)
# print('output_ffm.shape:',em.shape)
# torch.Size([100, 741, 10])(batch_size, num_fields*(num_fields-1)/2,embed_dim)
y_ffm = self.fm(em)
# compute scores from the ffm part of the model, output shape [batch_size, 1]
# print('y_linear.shape:',y_linear.shape) # torch.Size([100,1])
y = y_linear + y_ffm
return torch.sigmoid(y.squeeze(1)+ self.b)
%%save_to_fate model deepffm_model.py
import torch
from torch import nn
from torch_rechub.basic.features import SparseFeature
from torch_rechub.models.ranking import DeepFFM
class DeepFFMModel(nn.Module):
def __init__(self,ffm_linear_feas_dict, ffm_cross_feas_dict):
super().__init__()
linear_features = []
cross_features = []
def recover_sparse_feat(dict_):
return SparseFeature(dict_['name'], dict_['vocab_size'],dict_['embed_dim'])
for i in ffm_linear_feas_dict:
linear_features.append(recover_sparse_feat(i))
for i in ffm_cross_feas_dict:
cross_features.append(recover_sparse_feat(i))
self.model = DeepFFM(linear_features=linear_features, cross_features=cross_features, embed_dim=10, mlp_params={"dims": [1600, 1600], "dropout": 0.5, "activation": "relu"})
def forward(self, x):
return self.model(x)
Local Testing¶
Test this set (dataset + model) locally to see if it can run, and omit the federated aggregation part during local validation.
import torch
from federatedml.nn.homo.trainer.fedavg_trainer import FedAVGTrainer
ds = CriteoDataset()
path = '../../../../examples/data/criteo.csv'
dense_feas, dense_feas_dict, sparse_feas, sparse_feas_dict, ffm_linear_feas, \
ffm_linear_feas_dict, ffm_cross_feas, ffm_cross_feas_dict = ds.load(path)
fm_model = FMModel(dense_feas_dict=dense_feas_dict,sparse_feas_dict=sparse_feas_dict)
ffm_model = FFMModel(liner_feas_dict=ffm_linear_feas_dict, cross_feas_dict=ffm_cross_feas_dict)
dfm_model = DeepFMModel(dense_feas_dict=dense_feas_dict,sparse_feas_dict=sparse_feas_dict, mlp_params={"dims": [256, 128], "dropout": 0.2, "activation": "relu"})
dffm_model = DeepFFMModel(ffm_linear_feas_dict=ffm_linear_feas_dict,ffm_cross_feas_dict=ffm_cross_feas_dict)
model = fm_model
ipcl_python failed to import ipcl_python failed to import
data load finished
100%|██████████| 26/26 [00:13<00:00, 1.87it/s] 100%|██████████| 52/52 [00:01<00:00, 40.50it/s]
# here we test one model: the FM Model
trainer = FedAVGTrainer(epochs=1, batch_size=256)
trainer.local_mode()
trainer.set_model(model)
trainer.train(ds, None, optimizer=torch.optim.Adam(model.parameters(), lr=0.01), loss=torch.nn.BCELoss())
epoch is 0 100%|██████████| 1954/1954 [11:24<00:00, 2.86it/s] epoch loss is 1.3575431838684082
Submit a Pipeline to run a federation task¶
from federatedml.nn.dataset.criteo_dataset import CriteoDataset
import numpy as np
import pandas as pd
from pipeline.interface import Data, Model
from pipeline.component import Reader, Evaluation, DataTransform
from pipeline.backend.pipeline import PipeLine
from pipeline.component import HomoNN
from pipeline import fate_torch_hook
from torch import nn
import torch as t
import os
from pipeline.component.homo_nn import DatasetParam, TrainerParam
fate_torch_hook(t)
fate_project_path = os.path.abspath('../../../../')
data_path = 'examples/data/criteo.csv'
host = 10000
guest = 9999
arbiter = 10000
pipeline = PipeLine().set_initiator(role='guest', party_id=guest).set_roles(guest=guest, host=host, arbiter=arbiter)
data = {"name": "criteo", "namespace": "experiment"}
pipeline.bind_table(name=data['name'],
namespace=data['namespace'], path=fate_project_path + '/' + data_path)
# reader
reader_0 = Reader(name="reader_0")
reader_0.get_party_instance(
role='guest', party_id=guest).component_param(table=data)
reader_0.get_party_instance(
role='host', party_id=host).component_param(table=data)
# compute model parameter for our CTR models
ds = CriteoDataset()
dense_feas, dense_feas_dict, sparse_feas, sparse_feas_dict, ffm_linear_feas, ffm_linear_feas_dict, \
ffm_cross_feas, ffm_cross_feas_dict = ds.load(fate_project_path + '/' + data_path)
model = t.nn.Sequential(
t.nn.CustModel(module_name='fm_model', class_name='FMModel', dense_feas_dict=dense_feas_dict, sparse_feas_dict=sparse_feas_dict)
)
nn_component = HomoNN(name='nn_0',
model=model,
loss=t.nn.BCELoss(),
optimizer=t.optim.Adam(
model.parameters(), lr=0.001, weight_decay=0.001),
dataset=DatasetParam(dataset_name='criteo_dataset'),
trainer=TrainerParam(trainer_name='fedavg_trainer', epochs=1, batch_size=256, validation_freqs=1,
data_loader_worker=6, shuffle=True),
torch_seed=100
)
pipeline.add_component(reader_0)
pipeline.add_component(nn_component, data=Data(train_data=reader_0.output.data))
pipeline.add_component(Evaluation(name='eval_0', eval_type='binary'), data=Data(data=nn_component.output.data))
pipeline.compile()
pipeline.fit()
data load finished
100%|██████████| 26/26 [00:13<00:00, 1.87it/s] 100%|██████████| 52/52 [00:01<00:00, 43.13it/s] 2022-12-27 10:52:35.371 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:83 - Job id is 202212271052347495390 2022-12-27 10:52:35.523 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:98 - Job is still waiting, time elapse: 0:00:00 2022-12-27 10:52:36.535 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:98 - Job is still waiting, time elapse: 0:00:01 m2022-12-27 10:52:38.601 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:125 - 2022-12-27 10:52:38.603 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component reader_0, time elapse: 0:00:03 2022-12-27 10:52:39.632 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component reader_0, time elapse: 0:00:04 2022-12-27 10:52:40.768 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component reader_0, time elapse: 0:00:05 2022-12-27 10:52:41.822 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component reader_0, time elapse: 0:00:06 2022-12-27 10:52:42.864 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component reader_0, time elapse: 0:00:07 2022-12-27 10:52:43.893 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component reader_0, time elapse: 0:00:08 m2022-12-27 10:52:46.066 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:125 - 2022-12-27 10:52:46.067 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:10 2022-12-27 10:52:47.095 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:11 2022-12-27 10:52:48.134 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:12 2022-12-27 10:52:49.166 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:13 2022-12-27 10:52:50.251 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:14 2022-12-27 10:52:51.279 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:15 2022-12-27 10:52:52.323 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:16 2022-12-27 10:52:53.354 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:17 2022-12-27 10:52:54.438 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:19 2022-12-27 10:52:55.471 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:20 2022-12-27 10:52:56.500 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:21 2022-12-27 10:52:57.534 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:22 2022-12-27 10:52:58.597 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:23 2022-12-27 10:52:59.648 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:24 2022-12-27 10:53:00.723 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:25 2022-12-27 10:53:01.773 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:26 2022-12-27 10:53:02.796 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:27 2022-12-27 10:53:03.828 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:28 2022-12-27 10:53:04.855 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:29 2022-12-27 10:53:05.883 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:30 2022-12-27 10:53:06.919 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:31 2022-12-27 10:53:07.954 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:32 2022-12-27 10:53:08.994 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:33 2022-12-27 10:53:10.020 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:34 2022-12-27 10:53:11.055 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:35 2022-12-27 10:53:12.087 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:36 2022-12-27 10:53:13.113 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:37 2022-12-27 10:53:14.147 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:38 2022-12-27 10:53:15.174 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:39 2022-12-27 10:53:16.213 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:40 2022-12-27 10:53:17.250 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:41 2022-12-27 10:53:18.278 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:42 2022-12-27 10:53:19.312 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:43 2022-12-27 10:53:20.353 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:44 2022-12-27 10:53:21.378 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:46 2022-12-27 10:53:22.422 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:47 2022-12-27 10:53:23.445 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:48 2022-12-27 10:53:24.496 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:49 2022-12-27 10:53:25.537 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:50 2022-12-27 10:53:26.564 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:51 2022-12-27 10:53:27.928 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:52 2022-12-27 10:53:28.953 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:53 2022-12-27 10:53:30.031 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:54 2022-12-27 10:53:31.163 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:55 2022-12-27 10:53:32.275 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:56 2022-12-27 10:53:33.393 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:58 2022-12-27 10:53:34.495 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:00:59 2022-12-27 10:53:35.601 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:00 2022-12-27 10:53:36.694 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:01 2022-12-27 10:53:37.813 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:02 2022-12-27 10:53:38.924 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:03 2022-12-27 10:53:40.041 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:04 2022-12-27 10:53:41.127 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:05 2022-12-27 10:53:42.253 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:06 2022-12-27 10:53:43.335 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:07 2022-12-27 10:53:44.421 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:09 2022-12-27 10:53:45.519 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:10 2022-12-27 10:53:46.607 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:11 2022-12-27 10:53:47.737 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:12 2022-12-27 10:53:48.822 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:13 2022-12-27 10:53:49.924 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:14 2022-12-27 10:53:51.003 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:15 2022-12-27 10:53:52.108 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:16 2022-12-27 10:53:53.171 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:17 2022-12-27 10:53:54.271 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:18 2022-12-27 10:53:55.325 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:19 2022-12-27 10:53:56.423 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:21 2022-12-27 10:53:57.519 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:22 2022-12-27 10:53:58.600 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:23 2022-12-27 10:53:59.659 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:24 2022-12-27 10:54:00.743 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:25 2022-12-27 10:54:01.833 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:26 2022-12-27 10:54:02.946 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:27 2022-12-27 10:54:04.034 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:28 2022-12-27 10:54:05.112 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:29 2022-12-27 10:54:06.230 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:30 2022-12-27 10:54:07.290 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:31 2022-12-27 10:54:08.417 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:33 2022-12-27 10:54:09.600 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:34 2022-12-27 10:54:10.699 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:35 2022-12-27 10:54:11.811 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:36 2022-12-27 10:54:12.931 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:37 2022-12-27 10:54:14.383 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:39 2022-12-27 10:54:15.464 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:40 2022-12-27 10:54:16.560 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:41 2022-12-27 10:54:17.628 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:42 2022-12-27 10:54:18.701 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:43 2022-12-27 10:54:19.728 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:44 2022-12-27 10:54:20.762 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component nn_0, time elapse: 0:01:45 m2022-12-27 10:54:22.827 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:102 - Job is canceled, time elapse: 0:01:47