Pipeline Tutorial With Using Data With Recording Meta¶
Starting at FATE-v1.9.0, Fate supports to use data with recoding meta. Using data with meta means that some information like "input format", "with_label" will be set at uploading step, please refer to FATE-Flow for more examples.
Install¶
Pipeline
is distributed along with FATE-Client.
pip install fate_client
To use PipeLine, we need to first specify which FATE Flow Service
to connect to. Once FATE-Client
installed, one can find a cmd enterpoint named pipeline
:
!pipeline --help
Usage: pipeline [OPTIONS] COMMAND [ARGS]... Options: --help Show this message and exit. Commands: config pipeline config tool init - DESCRIPTION: Pipeline Config Command.
Assume we have a FATE Flow Service at 127.0.0.1:9380(default in standalone), then execute the following command to initialize PipeLine:
!pipeline init --ip 127.0.0.1 --port 9380
Upload Data with Meta¶
Before starting a modeling job, data should be uploaded. Here we assume that the task is between two parties: guest and host, and is run in standalone mode. If you want to run in cluster mode, make sure that the data be uploaded in each party respectively.
from pipeline.backend.pipeline import PipeLine
Make a pipeline
instance
- initiator:
- role: guest
- party: 9999
- roles:
- guest: 9999
pipeline_upload = PipeLine().set_initiator(role="guest", party_id=9999).set_roles(guest=9999)
We will use "breast_hetero_guest" and "breast_hetero_host" under the examples data to demonstrate how to upload data with meta.
Define data meta:
breast_hetero_guest_meta = {"delimiter": ",", "with_label": True, "label_name": "y",
"input_format": "dense", "data_type": "float64"}
breast_hetero_host_meta = {"delimiter": ",", "with_label": False,
"input_format": "dense", "data_type": "float64"}
breast_hetero_guest = {"name": "breast_hetero_guest_with_meta", "namespace": f"experiment"}
breast_hetero_host = {"name": "breast_hetero_host_with_meta", "namespace": f"experiment"}
# This should be replaced with actual location where FATE is deployed
fate_project_base="/data/projects/fate" # $fate_project_base
import os
pipeline_upload.add_upload_data(file=os.path.join(fate_project_base, "examples/data/breast_hetero_guest.csv"),
table_name=breast_hetero_guest["name"],
namespace=breast_hetero_guest["namespace"],
head=1, partition=4,
with_meta=True, meta=breast_hetero_guest_meta) # with_meta=True means uploading data with meta
pipeline_upload.add_upload_data(file=os.path.join(fate_project_base, "examples/data/breast_hetero_host.csv"),
table_name=breast_hetero_host["name"],
namespace=breast_hetero_host["namespace"],
head=1, partition=4,
with_meta=True, meta=breast_hetero_host_meta)
We can then upload the dataset
pipeline_upload.upload(drop=1)
UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%
2022-08-29 14:53:53.497 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:83 - Job id is 202208291453533660260 2022-08-29 14:53:53.502 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:98 - Job is still waiting, time elapse: 0:00:00
2022-08-29 14:53:54.515 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:98 - Job is still waiting, time elapse: 0:00:01 m2022-08-29 14:53:55.529 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:125 - 2022-08-29 14:53:55.530 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component upload_0, time elapse: 0:00:02 2022-08-29 14:53:56.544 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component upload_0, time elapse: 0:00:03 2022-08-29 14:53:57.570 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component upload_0, time elapse: 0:00:04 2022-08-29 14:53:58.588 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component upload_0, time elapse: 0:00:05 2022-08-29 14:53:59.599 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:89 - Job is success!!! Job id is 202208291453533660260 2022-08-29 14:53:59.600 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:90 - Total time: 0:00:06
UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%
2022-08-29 14:53:59.743 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:83 - Job id is 202208291453596053400 2022-08-29 14:53:59.749 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:98 - Job is still waiting, time elapse: 0:00:00
2022-08-29 14:54:00.761 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:98 - Job is still waiting, time elapse: 0:00:01 m2022-08-29 14:54:01.776 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:125 - 2022-08-29 14:54:01.777 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component upload_0, time elapse: 0:00:02 2022-08-29 14:54:02.792 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component upload_0, time elapse: 0:00:03 2022-08-29 14:54:03.805 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component upload_0, time elapse: 0:00:04 2022-08-29 14:54:04.897 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component upload_0, time elapse: 0:00:05 2022-08-29 14:54:05.907 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:89 - Job is success!!! Job id is 202208291453596053400 2022-08-29 14:54:05.908 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:90 - Total time: 0:00:06
Use Data with Meta to Run A Modeling Task¶
When data is uploaded with meta, all fields specified in meta should not be set again in DataTransform
component configuration, as DataTransform
will use fields in meta to process data. Please refer to doc of DataTransform
component for more details.
from pipeline.component import Reader, DataTransform, Intersection, HeteroSecureBoost, Evaluation
from pipeline.interface import Data
Make a pipeline instance:
- initiator:
- role: guest
- party: 9999
- roles:
- guest: 9999
- host: 10000
pipeline = PipeLine() \
.set_initiator(role='guest', party_id=9999) \
.set_roles(guest=9999, host=10000)
Define Reader
to load data
reader_0 = Reader(name="reader_0")
# set guest parameter
reader_0.get_party_instance(role='guest', party_id=9999).component_param(
table=breast_hetero_guest)
# set host parameter
reader_0.get_party_instance(role='host', party_id=10000).component_param(
table=breast_hetero_host)
Add DataTransform
component to parse raw data into Data Instance. As shown above, meta is already set when data is uploaded, so corresponding parameters will not be set in DataTransform
again.
data_transform_0 = DataTransform(name="data_transform_0")
Add other components
intersect_0 = Intersection(name="intersect_0")
hetero_secureboost_0 = HeteroSecureBoost(name="hetero_secureboost_0",
num_trees=5,
bin_num=16,
task_type="classification",
objective_param={"objective": "cross_entropy"},
encrypt_param={"method": "paillier"},
tree_param={"max_depth": 3})
evaluation_0 = Evaluation(name="evaluation_0", eval_type="binary")
Add components to pipeline, in order of execution:
- data_transform_0 comsume reader_0's output data
- intersect_0 comsume data_transform_0's output data
- hetero_secureboost_0 consume intersect_0's output data
- evaluation_0 consume hetero_secureboost_0's prediciton result on training data
Then compile our pipeline to make it ready for submission.
pipeline.add_component(reader_0)
pipeline.add_component(data_transform_0, data=Data(data=reader_0.output.data))
pipeline.add_component(intersect_0, data=Data(data=data_transform_0.output.data))
pipeline.add_component(hetero_secureboost_0, data=Data(train_data=intersect_0.output.data))
pipeline.add_component(evaluation_0, data=Data(data=hetero_secureboost_0.output.data))
pipeline.compile();
Now, submit(fit
) our pipeline:
pipeline.fit()
2022-08-29 14:54:52.035 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:83 - Job id is 202208291454509303160 2022-08-29 14:54:52.041 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:98 - Job is still waiting, time elapse: 0:00:00 m2022-08-29 14:54:53.056 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:125 - 2022-08-29 14:54:53.058 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component reader_0, time elapse: 0:00:01 2022-08-29 14:54:54.075 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component reader_0, time elapse: 0:00:02 2022-08-29 14:54:55.094 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component reader_0, time elapse: 0:00:03 2022-08-29 14:54:56.112 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component reader_0, time elapse: 0:00:04 m2022-08-29 14:54:57.130 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:125 - 2022-08-29 14:54:57.131 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component data_transform_0, time elapse: 0:00:05 2022-08-29 14:54:58.150 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component data_transform_0, time elapse: 0:00:06 2022-08-29 14:54:59.173 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component data_transform_0, time elapse: 0:00:07 2022-08-29 14:55:00.199 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component data_transform_0, time elapse: 0:00:08 2022-08-29 14:55:01.222 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component data_transform_0, time elapse: 0:00:09 2022-08-29 14:55:02.245 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component data_transform_0, time elapse: 0:00:10 2022-08-29 14:55:03.272 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component data_transform_0, time elapse: 0:00:11 2022-08-29 14:55:04.296 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component data_transform_0, time elapse: 0:00:12 2022-08-29 14:55:05.316 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component data_transform_0, time elapse: 0:00:13 2022-08-29 14:55:06.353 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component data_transform_0, time elapse: 0:00:14 m2022-08-29 14:55:08.429 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:125 - 2022-08-29 14:55:08.430 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:16 2022-08-29 14:55:09.449 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:17 2022-08-29 14:55:10.467 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:18 2022-08-29 14:55:11.487 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:19 2022-08-29 14:55:12.508 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:20 2022-08-29 14:55:13.529 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:21 2022-08-29 14:55:14.552 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:22 2022-08-29 14:55:15.579 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:23 2022-08-29 14:55:16.605 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:24 2022-08-29 14:55:17.623 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:25 2022-08-29 14:55:18.640 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:26 2022-08-29 14:55:19.655 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:27 2022-08-29 14:55:20.671 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:28 2022-08-29 14:55:21.688 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:29 2022-08-29 14:55:22.706 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:30 2022-08-29 14:55:23.724 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:31 2022-08-29 14:55:24.740 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:32 2022-08-29 14:55:25.788 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:33 m2022-08-29 14:55:26.828 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:125 - 2022-08-29 14:55:26.830 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:34 2022-08-29 14:55:27.844 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:35 2022-08-29 14:55:28.862 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:36 2022-08-29 14:55:29.878 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:37 2022-08-29 14:55:30.899 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:38 2022-08-29 14:55:31.920 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:39 2022-08-29 14:55:32.944 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:40 2022-08-29 14:55:33.969 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:41 2022-08-29 14:55:34.991 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:42 2022-08-29 14:55:36.018 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:43 2022-08-29 14:55:37.042 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:45 2022-08-29 14:55:38.069 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:46 2022-08-29 14:55:39.094 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:47 2022-08-29 14:55:40.113 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:48 2022-08-29 14:55:41.133 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:49 2022-08-29 14:55:42.150 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:50 2022-08-29 14:55:43.166 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:51 2022-08-29 14:55:44.226 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:52 2022-08-29 14:55:45.243 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:53 2022-08-29 14:55:46.261 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:54 2022-08-29 14:55:47.283 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:55 2022-08-29 14:55:48.298 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:56 2022-08-29 14:55:49.317 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:57 2022-08-29 14:55:50.332 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:58 2022-08-29 14:55:51.345 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:00:59 2022-08-29 14:55:52.369 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:01:00 2022-08-29 14:55:53.421 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:01:01 2022-08-29 14:55:54.442 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:01:02 2022-08-29 14:55:55.495 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:01:03 2022-08-29 14:55:56.513 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:01:04 2022-08-29 14:55:57.532 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:01:05 2022-08-29 14:55:58.553 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:01:06 2022-08-29 14:55:59.579 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:01:07 2022-08-29 14:56:00.599 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:01:08 2022-08-29 14:56:01.616 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:01:09 2022-08-29 14:56:02.635 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:01:10 2022-08-29 14:56:03.649 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:01:11 2022-08-29 14:56:04.668 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:01:12 2022-08-29 14:56:05.689 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:01:13 2022-08-29 14:56:06.708 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:01:14 2022-08-29 14:56:07.732 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:01:15 2022-08-29 14:56:08.749 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:01:16 2022-08-29 14:56:09.804 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:01:17 2022-08-29 14:56:10.859 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:01:18 2022-08-29 14:56:11.926 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:01:19 2022-08-29 14:56:12.963 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component hetero_secureboost_0, time elapse: 0:01:20 m2022-08-29 14:56:13.975 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:125 - 2022-08-29 14:56:13.976 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component evaluation_0, time elapse: 0:01:21 2022-08-29 14:56:14.992 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component evaluation_0, time elapse: 0:01:22 2022-08-29 14:56:16.013 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component evaluation_0, time elapse: 0:01:23 2022-08-29 14:56:17.046 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component evaluation_0, time elapse: 0:01:25 2022-08-29 14:56:18.060 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:89 - Job is success!!! Job id is 202208291454509303160 2022-08-29 14:56:18.061 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:90 - Total time: 0:01:26
For more examples on using pipeline to submit jobs, please refer to PipeLine demos