Pipeline tutorial hetero sbt
Pipeline Tutorial with HeteroSecureBoost¶
install¶
Pipeline
is distributed along with fate_client.
pip install fate_client
To use Pipeline, we need to first specify which FATE Flow Service
to connect to. Once fate_client
installed, one can find an cmd enterpoint name pipeline
:
!pipeline --help
Usage: pipeline [OPTIONS] COMMAND [ARGS]... Options: --help Show this message and exit. Commands: config pipeline config tool init - DESCRIPTION: Pipeline Config Command.
Assume we have a FATE Flow Service
in 127.0.0.1:9380(defaults in standalone), then exec
!pipeline init --ip 127.0.0.1 --port 9380
Pipeline configuration succeeded.
Hetero SecureBoost Example¶
Before start a modeling task, data to be used should be uploaded. Please refer to this guide.
The pipeline
package provides components to compose a FATE pipeline
.
from pipeline.backend.pipeline import PipeLine
from pipeline.component import Reader, DataTransform, Intersection, HeteroSecureBoost, Evaluation
from pipeline.interface import Data
Make a pipeline
instance:
- initiator:
* role: guest
* party: 9999
- roles:
* guest: 9999
* host: 10000
pipeline = PipeLine() \
.set_initiator(role='guest', party_id=9999) \
.set_roles(guest=9999, host=10000)
Define a Reader
to load data
reader_0 = Reader(name="reader_0")
# set guest parameter
reader_0.get_party_instance(role='guest', party_id=9999).component_param(
table={"name": "breast_hetero_guest", "namespace": "experiment"})
# set host parameter
reader_0.get_party_instance(role='host', party_id=10000).component_param(
table={"name": "breast_hetero_host", "namespace": "experiment"})
Add a DataTransform
component to parse raw data into Data Instance
data_transform_0 = DataTransform(name="data_transform_0")
# set guest parameter
data_transform_0.get_party_instance(role='guest', party_id=9999).component_param(
with_label=True)
data_transform_0.get_party_instance(role='host', party_id=[10000]).component_param(
with_label=False)
Add a Intersection
component to perform PSI for hetero-scenario
intersect_0 = Intersection(name="intersect_0")
Now, we define the HeteroSecureBoost
component. The following parameters will be set for all parties involved.
hetero_secureboost_0 = HeteroSecureBoost(name="hetero_secureboost_0",
num_trees=5,
bin_num=16,
task_type="classification",
objective_param={"objective": "cross_entropy"},
encrypt_param={"method": "paillier"},
tree_param={"max_depth": 3})
To show the evaluation result, an "Evaluation" component is needed.
evaluation_0 = Evaluation(name="evaluation_0", eval_type="binary")
Add components to pipeline, in order of execution:
- data_transform_0 comsume reader_0's output data
- intersect_0 comsume data_transform_0's output data
- hetero_secureboost_0 consume intersect_0's output data
- evaluation_0 consume hetero_secureboost_0's prediciton result on training data
Then compile our pipeline to make it ready for submission.
pipeline.add_component(reader_0)
pipeline.add_component(data_transform_0, data=Data(data=reader_0.output.data))
pipeline.add_component(intersect_0, data=Data(data=data_transform_0.output.data))
pipeline.add_component(hetero_secureboost_0, data=Data(train_data=intersect_0.output.data))
pipeline.add_component(evaluation_0, data=Data(data=hetero_secureboost_0.output.data))
pipeline.compile();
Now, submit(fit) our pipeline:
pipeline.fit()
2021-12-31 03:24:22.633 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:123 - Job id is 202112310324182459270 2021-12-31 03:24:23.152 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:144 - Job is still waiting, time elapse: 0:00:00 m2021-12-31 03:24:23.671 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:173 - 2021-12-31 03:24:27.861 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:177 - Running component reader_0, time elapse: 0:00:05 m2021-12-31 03:24:30.533 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:173 - 2021-12-31 03:24:34.732 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:177 - Running component data_transform_0, time elapse: 0:00:12 m2021-12-31 03:24:37.333 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:173 - 2021-12-31 03:24:43.185 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:177 - Running component intersect_0, time elapse: 0:00:20 m2021-12-31 03:24:46.315 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:173 - 2021-12-31 03:25:18.915 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:177 - Running component hetero_secureboost_0, time elapse: 0:00:56 m2021-12-31 03:25:22.043 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:173 - 2021-12-31 03:25:27.867 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:177 - Running component evaluation_0, time elapse: 0:01:05 2021-12-31 03:25:31.005 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:131 - Job is success!!! Job id is 202112310324182459270 2021-12-31 03:25:31.007 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:132 - Total time: 0:01:08
Once training is done, trained model may be used for prediction. Optionally, save the trained pipeline for future use.
pipeline.dump("pipeline_saved.pkl");
First, deploy needed components from train pipeline
pipeline = PipeLine.load_model_from_file('pipeline_saved.pkl')
pipeline.deploy_component([pipeline.data_transform_0, pipeline.intersect_0, pipeline.hetero_secureboost_0]);
Define new Reader
components for reading prediction data
reader_1 = Reader(name="reader_1")
reader_1.get_party_instance(role="guest", party_id=9999).component_param(table={"name": "breast_hetero_guest", "namespace": "experiment"})
reader_1.get_party_instance(role="host", party_id=10000).component_param(table={"name": "breast_hetero_host", "namespace": "experiment"})
Optionally, define new Evaluation
component.
evaluation_0 = Evaluation(name="evaluation_0", eval_type="binary")
Add components to predict pipeline in order of execution:
predict_pipeline = PipeLine()
predict_pipeline.add_component(reader_1)\
.add_component(pipeline,
data=Data(predict_input={pipeline.data_transform_0.input.data: reader_1.output.data}))\
.add_component(evaluation_0, data=Data(data=pipeline.hetero_secureboost_0.output.data));
Then, run prediction job
predict_pipeline.predict()
2021-12-31 03:25:35.541 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:123 - Job id is 202112310325328444510 2021-12-31 03:25:47.384 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:144 - Job is still waiting, time elapse: 0:00:11 m2021-12-31 03:25:47.903 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:173 - 2021-12-31 03:25:52.078 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:177 - Running component reader_1, time elapse: 0:00:16 m2021-12-31 03:25:54.161 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:173 - 2021-12-31 03:25:58.545 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:177 - Running component data_transform_0, time elapse: 0:00:23 m2021-12-31 03:26:01.167 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:173 - 2021-12-31 03:26:07.502 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:177 - Running component intersect_0, time elapse: 0:00:31 m2021-12-31 03:26:10.137 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:173 - 2021-12-31 03:26:18.580 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:177 - Running component hetero_secureboost_0, time elapse: 0:00:43 m2021-12-31 03:26:21.245 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:173 - 2021-12-31 03:26:25.480 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:177 - Running component evaluation_0, time elapse: 0:00:49 2021-12-31 03:26:28.136 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:131 - Job is success!!! Job id is 202112310325328444510 2021-12-31 03:26:28.143 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:132 - Total time: 0:00:52
For more demo on using pipeline to submit jobs, please refer to pipeline demos