Pipeline Tutorial With Supporing Multiple ID Columns¶
Starting at FATE-v1.9.0, FATE supports data with multiple matching columns. To use this feature, data should be uploaded with meta. Please check the tutorial "Pipeline Tutorial With Using Data With Recording Meta" first before proceeding.
Install¶
Pipeline
is distributed along with FATE-Client.
pip install fate_client
To use Pipeline, we need to first specify which FATE Flow Service
to connect to. Once fate_client
installed, one can find a cmd enterpoint named pipeline
!pipeline --help
Usage: pipeline [OPTIONS] COMMAND [ARGS]... Options: --help Show this message and exit. Commands: config pipeline config tool init - DESCRIPTION: Pipeline Config Command.
Assume we have a FATE Flow Service in 127.0.0.1:9380(defaults in standalone), then exec
!pipeline init --ip 127.0.0.1 --port 9380
Pipeline configuration succeeded.
Upload Data with Multiple id Columns¶
We should first prepare the data with multiple id columns. Assume that we have twe sample data with the follwing format:
guest site's data:
phone,device_id,seq_id,x0
10000,device_a,seq_1,0
10002,device_b,seq_3,1
10004,device_c,seq_5,2
10006,device_d,seq_7,4
10008,device_e,seq_9,5
100010,device_f,seq_11,6
100012,device_g,seq_13,7
100014,device_h,seq_15,8
100016,device_i,seq_17,9
100018,device_j,seq_19,10
host site's data:
device_id,seq_id,phone,x0
device_d,seq_0,10000,0
device_e,seq_1,10001,1
device_f,seq_2,10002,2
device_g,seq_3,10003,3
device_h,seq_4,10004,4
device_i,seq_5,10005,5
device_j,seq_6,10005,6
device_k,seq_7,10006,7
device_l,seq_8,10007,8
device_k,seq_9,10008,9
fate_project_base="/data/projects/fate"
guest_data_path = fate_project_base + "/examples/data/guest_multi_id_columns.csv"
host_data_path = fate_project_base + "/examples/data/host_multi_id_columns.csv"
Write guest example data to local
with open(guest_data_path, "w") as fout:
fout.write("phone,device_id,seq_id,x0\n10000,device_a,seq_1,0\n10002,device_b,seq_3,1\n10004,device_c,seq_5,2\n")
fout.write("10006,device_d,seq_7,4\n10008,device_e,seq_9,5\n100010,device_f,seq_11,6\n100012,device_g,seq_13,7\n")
fout.write("100014,device_h,seq_15,8\n100016,device_i,seq_17,9\n100018,device_j,seq_19,10\n")
Write host example data to local
with open(host_data_path, "w") as fout:
fout.write("device_id,seq_id,phone,x0\ndevice_d,seq_0,10000,0\ndevice_e,seq_1,10001,1\ndevice_f,seq_2,10002,2\n")
fout.write("device_g,seq_3,10003,3\ndevice_h,seq_4,10004,4\ndevice_i,seq_5,10005,5\ndevice_j,seq_6,10005,6\n")
fout.write("device_k,seq_7,10006,7\ndevice_l,seq_8,10007,8\ndevice_k,seq_9,10008,9\n")
Make a pipeline
instance with the following setting:
initiator:
role: guest
party: 9999
roles:
guest: 9999
from pipeline.backend.pipeline import PipeLine
pipeline_upload = PipeLine().set_initiator(role="guest", party_id=9999).set_roles(guest=9999)
Define data meta:
guest_data_meta = {"delimiter": ",", "with_label": False,
"input_format": "dense", "data_type": "int",
"with_match_id": True, # with_match_id should be true
"id_list": ["phone","device_id","seq_id"]} # id_list specifies the id columns
host_data_meta = {"delimiter": ",", "with_label": False,
"input_format": "dense", "data_type": "int",
"with_match_id": True, # with_match_id should be true
"id_list": ["device_id","seq_id","phone"]} # id_list specifies the id columns
multi_id_guest = {"name": "multi_id_guest", "namespace": f"experiment"}
multi_id_host = {"name": "multi_id_host", "namespace": f"experiment"}
pipeline_upload.add_upload_data(file=guest_data_path,
table_name=multi_id_guest["name"],
namespace=multi_id_guest["namespace"],
head=1, partition=4,
extend_sid=True, # upload data with automatically append sample id
with_meta=True, meta=guest_data_meta) # with_meta=True means uploading data with meta
pipeline_upload.add_upload_data(file=host_data_path,
table_name=multi_id_host["name"],
namespace=multi_id_host["namespace"],
head=1, partition=4,
extend_sid=True,
with_meta=True, meta=host_data_meta)
We can then upload the dataset
pipeline_upload.upload(drop=1)
UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%
2022-08-29 14:43:21.101 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:83 - Job id is 202208291443209718580 2022-08-29 14:43:21.108 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:98 - Job is still waiting, time elapse: 0:00:00
2022-08-29 14:43:22.117 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:98 - Job is still waiting, time elapse: 0:00:01 m2022-08-29 14:43:23.131 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:125 - 2022-08-29 14:43:23.132 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component upload_0, time elapse: 0:00:02 2022-08-29 14:43:24.147 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component upload_0, time elapse: 0:00:03 2022-08-29 14:43:25.165 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component upload_0, time elapse: 0:00:04 2022-08-29 14:43:26.184 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component upload_0, time elapse: 0:00:05 2022-08-29 14:43:27.197 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:89 - Job is success!!! Job id is 202208291443209718580 2022-08-29 14:43:27.198 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:90 - Total time: 0:00:06
UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%
2022-08-29 14:43:27.349 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:83 - Job id is 202208291443272038530 2022-08-29 14:43:27.356 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:98 - Job is still waiting, time elapse: 0:00:00
2022-08-29 14:43:28.364 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:98 - Job is still waiting, time elapse: 0:00:01 m2022-08-29 14:43:30.390 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:125 - 2022-08-29 14:43:30.391 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component upload_0, time elapse: 0:00:03 2022-08-29 14:43:31.406 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component upload_0, time elapse: 0:00:04 2022-08-29 14:43:32.421 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component upload_0, time elapse: 0:00:05 2022-08-29 14:43:33.439 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component upload_0, time elapse: 0:00:06 2022-08-29 14:43:34.451 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:89 - Job is success!!! Job id is 202208291443272038530 2022-08-29 14:43:34.452 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:90 - Total time: 0:00:07
Run Intersection Task Using Specified id Column¶
from pipeline.component import Reader, DataTransform, Intersection
from pipeline.interface import Data
Make a pipeline
instance:,
initiator:
role: guest
party: 9999
roles:
guest: 9999
host: 10000
pipeline = PipeLine() \
.set_initiator(role='guest', party_id=9999) \
.set_roles(guest=9999, host=10000)
Define Reader
to load data
reader_0 = Reader(name="reader_0")
# set guest parameter
reader_0.get_party_instance(role='guest', party_id=9999).component_param(
table=multi_id_guest)
# set host parameter
reader_0.get_party_instance(role='host', party_id=10000).component_param(
table=multi_id_host)
Configure match_id_name
in DataTransform
component.
data_transform_0 = DataTransform(name="data_transform_0",
match_id_name="device_id") # specify "device_id" to be the match id column
Include an Intersection
component
intersect_0 = Intersection(name="intersect_0")
Add components to pipeline, in order of execution:
data_transform_0
comsumesreader_0's
output dataintersect_0
comsumesdata_transform_0's
output data
Then compile our pipeline to make it ready for submission.
pipeline.add_component(reader_0)
pipeline.add_component(data_transform_0, data=Data(data=reader_0.output.data))
pipeline.add_component(intersect_0, data=Data(data=data_transform_0.output.data))
pipeline.compile();
Now, submit(fit) our pipeline:
pipeline.fit()
2022-08-29 14:44:14.834 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:83 - Job id is 202208291444146346770 2022-08-29 14:44:14.842 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:98 - Job is still waiting, time elapse: 0:00:00 2022-08-29 14:44:15.855 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:98 - Job is still waiting, time elapse: 0:00:01 m2022-08-29 14:44:16.870 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:125 - 2022-08-29 14:44:16.871 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component reader_0, time elapse: 0:00:02 2022-08-29 14:44:17.891 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component reader_0, time elapse: 0:00:03 2022-08-29 14:44:18.910 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component reader_0, time elapse: 0:00:04 2022-08-29 14:44:19.930 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component reader_0, time elapse: 0:00:05 m2022-08-29 14:44:21.987 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:125 - 2022-08-29 14:44:21.988 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component data_transform_0, time elapse: 0:00:07 2022-08-29 14:44:23.001 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component data_transform_0, time elapse: 0:00:08 2022-08-29 14:44:24.022 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component data_transform_0, time elapse: 0:00:09 2022-08-29 14:44:25.043 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component data_transform_0, time elapse: 0:00:10 2022-08-29 14:44:26.064 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component data_transform_0, time elapse: 0:00:11 2022-08-29 14:44:27.091 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component data_transform_0, time elapse: 0:00:12 2022-08-29 14:44:28.117 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component data_transform_0, time elapse: 0:00:13 2022-08-29 14:44:29.135 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component data_transform_0, time elapse: 0:00:14 2022-08-29 14:44:30.205 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component data_transform_0, time elapse: 0:00:15 2022-08-29 14:44:31.225 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component data_transform_0, time elapse: 0:00:16 m2022-08-29 14:44:32.244 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:125 - 2022-08-29 14:44:32.245 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:17 2022-08-29 14:44:33.261 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:18 2022-08-29 14:44:34.277 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:19 2022-08-29 14:44:35.299 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:20 2022-08-29 14:44:36.323 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:21 2022-08-29 14:44:37.345 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:22 2022-08-29 14:44:38.366 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:23 2022-08-29 14:44:39.392 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:24 2022-08-29 14:44:40.419 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:25 2022-08-29 14:44:41.441 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:26 2022-08-29 14:44:42.459 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:27 2022-08-29 14:44:43.475 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:28 2022-08-29 14:44:44.507 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:29 2022-08-29 14:44:45.523 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:30 2022-08-29 14:44:46.538 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:31 2022-08-29 14:44:47.555 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:32 2022-08-29 14:44:48.569 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:33 2022-08-29 14:44:49.607 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:34 2022-08-29 14:44:50.628 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:127 - Running component intersect_0, time elapse: 0:00:35 2022-08-29 14:44:51.640 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:89 - Job is success!!! Job id is 202208291444146346770 2022-08-29 14:44:51.642 | INFO | pipeline.utils.invoker.job_submitter:monitor_job_status:90 - Total time: 0:00:36
Get intersection task summary info
print(pipeline.get_component("intersect_0").get_summary())
{'cardinality_only': False, 'intersect_num': 7, 'intersect_rate': 0.7}
Get intersection task's output data
print(pipeline.get_component("intersect_0").get_output_data())
extend_sid device_id x0 0 e19da596276511edb731acde480011223 device_d 4 1 e19da596276511edb731acde480011224 device_e 5 2 e19da596276511edb731acde480011225 device_f 6 3 e19da596276511edb731acde480011226 device_g 7 4 e19da596276511edb731acde480011227 device_h 8 5 e19da596276511edb731acde480011228 device_i 9 6 e19da596276511edb731acde480011229 device_j 10
For more examples on using pipeline to submit jobs, please refer to pipeline demos