{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "84c6c9b0",
   "metadata": {},
   "source": [
    "# Pipeline Tutorial With Supporing Multiple ID Columns"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "08fdf603",
   "metadata": {},
   "source": [
    "Starting at FATE-v1.9.0, FATE supports data with multiple matching columns. To use this feature, data should be uploaded with meta. Please check the [tutorial](./pipeline_tutorial_uploading_data_with_meta.ipynb) \"Pipeline Tutorial With Using Data With Recording Meta\" first before proceeding."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f0ac0403",
   "metadata": {},
   "source": [
    "## Install"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e8dafb8f",
   "metadata": {},
   "source": [
    "`Pipeline` is distributed along with [FATE-Client](https://pypi.org/project/fate-client/)."
   ]
  },
  {
   "cell_type": "markdown",
   "id": "44ff3e2f",
   "metadata": {},
   "source": [
    "```bash\n",
    "pip install fate_client\n",
    "```\n",
    "To use Pipeline, we need to first specify which `FATE Flow Service` to connect to. Once `fate_client` installed, one can find a cmd enterpoint named `pipeline`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "f973baa6",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Usage: pipeline [OPTIONS] COMMAND [ARGS]...\r\n",
      "\r\n",
      "Options:\r\n",
      "  --help  Show this message and exit.\r\n",
      "\r\n",
      "Commands:\r\n",
      "  config  pipeline config tool\r\n",
      "  init    \b - DESCRIPTION: Pipeline Config Command.\r\n"
     ]
    }
   ],
   "source": [
    "!pipeline --help"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "4801e5e1",
   "metadata": {},
   "source": [
    "Assume we have a FATE Flow Service in 127.0.0.1:9380(defaults in standalone), then exec"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "8feda3f7",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Pipeline configuration succeeded.\r\n"
     ]
    }
   ],
   "source": [
    "!pipeline init --ip 127.0.0.1 --port 9380"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "ad883490",
   "metadata": {},
   "source": [
    "## Upload Data with Multiple id Columns"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "cba4c378",
   "metadata": {},
   "source": [
    "We should first prepare the data with multiple id columns. Assume that we have twe sample data with the follwing format:\n",
    "\n",
    "guest site's data:  \n",
    "\n",
    "phone,device_id,seq_id,x0  \n",
    "10000,device_a,seq_1,0  \n",
    "10002,device_b,seq_3,1  \n",
    "10004,device_c,seq_5,2  \n",
    "10006,device_d,seq_7,4  \n",
    "10008,device_e,seq_9,5  \n",
    "100010,device_f,seq_11,6  \n",
    "100012,device_g,seq_13,7  \n",
    "100014,device_h,seq_15,8  \n",
    "100016,device_i,seq_17,9  \n",
    "100018,device_j,seq_19,10  \n",
    "\n",
    "host site's data:  \n",
    "device_id,seq_id,phone,x0  \n",
    "device_d,seq_0,10000,0  \n",
    "device_e,seq_1,10001,1  \n",
    "device_f,seq_2,10002,2  \n",
    "device_g,seq_3,10003,3  \n",
    "device_h,seq_4,10004,4  \n",
    "device_i,seq_5,10005,5  \n",
    "device_j,seq_6,10005,6  \n",
    "device_k,seq_7,10006,7  \n",
    "device_l,seq_8,10007,8  \n",
    "device_k,seq_9,10008,9  "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 81,
   "id": "94182179",
   "metadata": {},
   "outputs": [],
   "source": [
    "fate_project_base=\"/data/projects/fate\"\n",
    "\n",
    "guest_data_path = fate_project_base + \"/examples/data/guest_multi_id_columns.csv\"\n",
    "host_data_path = fate_project_base + \"/examples/data/host_multi_id_columns.csv\""
   ]
  },
  {
   "cell_type": "markdown",
   "id": "7dd10c3e",
   "metadata": {},
   "source": [
    "Write guest example data to local"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 61,
   "id": "c562d914",
   "metadata": {},
   "outputs": [],
   "source": [
    "with open(guest_data_path, \"w\") as fout:\n",
    "    fout.write(\"phone,device_id,seq_id,x0\\n10000,device_a,seq_1,0\\n10002,device_b,seq_3,1\\n10004,device_c,seq_5,2\\n\")\n",
    "    fout.write(\"10006,device_d,seq_7,4\\n10008,device_e,seq_9,5\\n100010,device_f,seq_11,6\\n100012,device_g,seq_13,7\\n\")\n",
    "    fout.write(\"100014,device_h,seq_15,8\\n100016,device_i,seq_17,9\\n100018,device_j,seq_19,10\\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f707fd19",
   "metadata": {},
   "source": [
    "Write host example data to local"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 62,
   "id": "383152a6",
   "metadata": {},
   "outputs": [],
   "source": [
    "with open(host_data_path, \"w\") as fout:\n",
    "    fout.write(\"device_id,seq_id,phone,x0\\ndevice_d,seq_0,10000,0\\ndevice_e,seq_1,10001,1\\ndevice_f,seq_2,10002,2\\n\")\n",
    "    fout.write(\"device_g,seq_3,10003,3\\ndevice_h,seq_4,10004,4\\ndevice_i,seq_5,10005,5\\ndevice_j,seq_6,10005,6\\n\")\n",
    "    fout.write(\"device_k,seq_7,10006,7\\ndevice_l,seq_8,10007,8\\ndevice_k,seq_9,10008,9\\n\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "54728c5f",
   "metadata": {},
   "source": [
    "Make a `pipeline` instance with the following setting:\n",
    "```yaml\n",
    "initiator:\n",
    "    role: guest\n",
    "    party: 9999\n",
    "roles:\n",
    "    guest: 9999\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 63,
   "id": "e180bd5a",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pipeline.backend.pipeline import PipeLine"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 64,
   "id": "5531ed49",
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline_upload = PipeLine().set_initiator(role=\"guest\", party_id=9999).set_roles(guest=9999)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "9c3dc70c",
   "metadata": {},
   "source": [
    "Define data meta:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 65,
   "id": "2150c330",
   "metadata": {},
   "outputs": [],
   "source": [
    "guest_data_meta = {\"delimiter\": \",\", \"with_label\": False,\n",
    "                   \"input_format\": \"dense\", \"data_type\": \"int\",\n",
    "                   \"with_match_id\": True,                 # with_match_id should be true\n",
    "                   \"id_list\": [\"phone\",\"device_id\",\"seq_id\"]} # id_list specifies the id columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 66,
   "id": "6af9248d",
   "metadata": {},
   "outputs": [],
   "source": [
    "host_data_meta = {\"delimiter\": \",\", \"with_label\": False,\n",
    "                  \"input_format\": \"dense\", \"data_type\": \"int\",\n",
    "                  \"with_match_id\": True,                 # with_match_id should be true\n",
    "                  \"id_list\": [\"device_id\",\"seq_id\",\"phone\"]} # id_list specifies the id columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 67,
   "id": "f4f91437",
   "metadata": {},
   "outputs": [],
   "source": [
    "multi_id_guest = {\"name\": \"multi_id_guest\", \"namespace\": f\"experiment\"}\n",
    "multi_id_host = {\"name\": \"multi_id_host\", \"namespace\": f\"experiment\"}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 68,
   "id": "4f5d4b3d",
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline_upload.add_upload_data(file=guest_data_path,\n",
    "                                table_name=multi_id_guest[\"name\"],         \n",
    "                                namespace=multi_id_guest[\"namespace\"],         \n",
    "                                head=1, partition=4,\n",
    "                                extend_sid=True,                      # upload data with automatically append sample id\n",
    "                                with_meta=True, meta=guest_data_meta) # with_meta=True means uploading data with meta                       \n",
    "\n",
    "pipeline_upload.add_upload_data(file=host_data_path,\n",
    "                                table_name=multi_id_host[\"name\"],\n",
    "                                namespace=multi_id_host[\"namespace\"],\n",
    "                                head=1, partition=4,      \n",
    "                                extend_sid=True,\n",
    "                                with_meta=True, meta=host_data_meta)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c5703306",
   "metadata": {},
   "source": [
    "We can then upload the dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 70,
   "id": "c24d8bea",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      " UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[32m2022-08-29 14:43:21.101\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m83\u001b[0m - \u001b[1mJob id is 202208291443209718580\n",
      "\u001b[0m\n",
      "\u001b[32m2022-08-29 14:43:21.108\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m98\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KJob is still waiting, time elapse: 0:00:00\u001b[0m\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[32m2022-08-29 14:43:22.117\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m98\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KJob is still waiting, time elapse: 0:00:01\u001b[0m\n",
      "\u001b[0mm2022-08-29 14:43:23.131\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m125\u001b[0m - \u001b[1m\n",
      "\u001b[32m2022-08-29 14:43:23.132\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component upload_0, time elapse: 0:00:02\u001b[0m\n",
      "\u001b[32m2022-08-29 14:43:24.147\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component upload_0, time elapse: 0:00:03\u001b[0m\n",
      "\u001b[32m2022-08-29 14:43:25.165\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component upload_0, time elapse: 0:00:04\u001b[0m\n",
      "\u001b[32m2022-08-29 14:43:26.184\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component upload_0, time elapse: 0:00:05\u001b[0m\n",
      "\u001b[32m2022-08-29 14:43:27.197\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m89\u001b[0m - \u001b[1mJob is success!!! Job id is 202208291443209718580\u001b[0m\n",
      "\u001b[32m2022-08-29 14:43:27.198\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m90\u001b[0m - \u001b[1mTotal time: 0:00:06\u001b[0m\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      " UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[32m2022-08-29 14:43:27.349\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m83\u001b[0m - \u001b[1mJob id is 202208291443272038530\n",
      "\u001b[0m\n",
      "\u001b[32m2022-08-29 14:43:27.356\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m98\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KJob is still waiting, time elapse: 0:00:00\u001b[0m\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[32m2022-08-29 14:43:28.364\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m98\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KJob is still waiting, time elapse: 0:00:01\u001b[0m\n",
      "\u001b[0mm2022-08-29 14:43:30.390\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m125\u001b[0m - \u001b[1m\n",
      "\u001b[32m2022-08-29 14:43:30.391\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component upload_0, time elapse: 0:00:03\u001b[0m\n",
      "\u001b[32m2022-08-29 14:43:31.406\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component upload_0, time elapse: 0:00:04\u001b[0m\n",
      "\u001b[32m2022-08-29 14:43:32.421\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component upload_0, time elapse: 0:00:05\u001b[0m\n",
      "\u001b[32m2022-08-29 14:43:33.439\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component upload_0, time elapse: 0:00:06\u001b[0m\n",
      "\u001b[32m2022-08-29 14:43:34.451\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m89\u001b[0m - \u001b[1mJob is success!!! Job id is 202208291443272038530\u001b[0m\n",
      "\u001b[32m2022-08-29 14:43:34.452\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m90\u001b[0m - \u001b[1mTotal time: 0:00:07\u001b[0m\n"
     ]
    }
   ],
   "source": [
    "pipeline_upload.upload(drop=1)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "60d06738",
   "metadata": {},
   "source": [
    "## Run Intersection Task Using Specified id Column"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 71,
   "id": "260998a3",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pipeline.component import Reader, DataTransform, Intersection\n",
    "from pipeline.interface import Data"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e482804a",
   "metadata": {},
   "source": [
    "Make a `pipeline` instance:,\n",
    "\n",
    "```yaml\n",
    "initiator:\n",
    "    role: guest\n",
    "    party: 9999\n",
    "roles:\n",
    "    guest: 9999\n",
    "    host: 10000\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 72,
   "id": "41e88c97",
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline = PipeLine() \\\n",
    "        .set_initiator(role='guest', party_id=9999) \\\n",
    "        .set_roles(guest=9999, host=10000)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "603e76e0",
   "metadata": {},
   "source": [
    "Define `Reader` to load data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 73,
   "id": "ed6c55bd",
   "metadata": {},
   "outputs": [],
   "source": [
    "reader_0 = Reader(name=\"reader_0\")\n",
    "# set guest parameter\n",
    "reader_0.get_party_instance(role='guest', party_id=9999).component_param(\n",
    "    table=multi_id_guest)\n",
    "# set host parameter\n",
    "reader_0.get_party_instance(role='host', party_id=10000).component_param(\n",
    "    table=multi_id_host)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "f3b82e7e",
   "metadata": {},
   "source": [
    "Configure `match_id_name` in `DataTransform` component."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 74,
   "id": "047d0346",
   "metadata": {},
   "outputs": [],
   "source": [
    "data_transform_0 = DataTransform(name=\"data_transform_0\",\n",
    "                                 match_id_name=\"device_id\") # specify \"device_id\" to be the match id column"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "b3547d52",
   "metadata": {},
   "source": [
    "Include an `Intersection` component"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 75,
   "id": "adbbaf77",
   "metadata": {},
   "outputs": [],
   "source": [
    "intersect_0 = Intersection(name=\"intersect_0\")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "1d84a0d2",
   "metadata": {},
   "source": [
    "Add components to pipeline, in order of execution:  \n",
    "\n",
    "* `data_transform_0` comsumes `reader_0's` output data  \n",
    "* `intersect_0` comsumes `data_transform_0's` output data\n",
    "\n",
    "Then compile our pipeline to make it ready for submission."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 76,
   "id": "eab6a1b4",
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline.add_component(reader_0)\n",
    "pipeline.add_component(data_transform_0, data=Data(data=reader_0.output.data))\n",
    "pipeline.add_component(intersect_0, data=Data(data=data_transform_0.output.data))\n",
    "pipeline.compile();"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e12a839d",
   "metadata": {},
   "source": [
    "Now, submit(fit) our pipeline:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 77,
   "id": "edc9542b",
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[32m2022-08-29 14:44:14.834\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m83\u001b[0m - \u001b[1mJob id is 202208291444146346770\n",
      "\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:14.842\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m98\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KJob is still waiting, time elapse: 0:00:00\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:15.855\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m98\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KJob is still waiting, time elapse: 0:00:01\u001b[0m\n",
      "\u001b[0mm2022-08-29 14:44:16.870\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m125\u001b[0m - \u001b[1m\n",
      "\u001b[32m2022-08-29 14:44:16.871\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component reader_0, time elapse: 0:00:02\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:17.891\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component reader_0, time elapse: 0:00:03\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:18.910\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component reader_0, time elapse: 0:00:04\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:19.930\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component reader_0, time elapse: 0:00:05\u001b[0m\n",
      "\u001b[0mm2022-08-29 14:44:21.987\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m125\u001b[0m - \u001b[1m\n",
      "\u001b[32m2022-08-29 14:44:21.988\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component data_transform_0, time elapse: 0:00:07\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:23.001\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component data_transform_0, time elapse: 0:00:08\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:24.022\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component data_transform_0, time elapse: 0:00:09\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:25.043\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component data_transform_0, time elapse: 0:00:10\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:26.064\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component data_transform_0, time elapse: 0:00:11\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:27.091\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component data_transform_0, time elapse: 0:00:12\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:28.117\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component data_transform_0, time elapse: 0:00:13\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:29.135\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component data_transform_0, time elapse: 0:00:14\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:30.205\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component data_transform_0, time elapse: 0:00:15\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:31.225\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component data_transform_0, time elapse: 0:00:16\u001b[0m\n",
      "\u001b[0mm2022-08-29 14:44:32.244\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m125\u001b[0m - \u001b[1m\n",
      "\u001b[32m2022-08-29 14:44:32.245\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:17\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:33.261\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:18\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:34.277\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:19\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:35.299\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:20\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:36.323\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:21\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:37.345\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:22\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:38.366\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:23\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:39.392\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:24\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:40.419\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:25\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:41.441\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:26\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:42.459\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:27\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:43.475\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:28\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:44.507\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:29\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:45.523\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:30\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:46.538\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:31\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:47.555\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:32\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:48.569\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:33\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:49.607\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:34\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:50.628\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m127\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:35\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:51.640\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m89\u001b[0m - \u001b[1mJob is success!!! Job id is 202208291444146346770\u001b[0m\n",
      "\u001b[32m2022-08-29 14:44:51.642\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m90\u001b[0m - \u001b[1mTotal time: 0:00:36\u001b[0m\n"
     ]
    }
   ],
   "source": [
    "pipeline.fit()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "88d86e5a",
   "metadata": {},
   "source": [
    "Get intersection task summary info"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 79,
   "id": "eff90b65",
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'cardinality_only': False, 'intersect_num': 7, 'intersect_rate': 0.7}\n"
     ]
    }
   ],
   "source": [
    "print(pipeline.get_component(\"intersect_0\").get_summary())"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "11f0137f",
   "metadata": {},
   "source": [
    "Get intersection task's output data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 80,
   "id": "f0081c6f",
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "                          extend_sid device_id  x0\n",
      "0  e19da596276511edb731acde480011223  device_d   4\n",
      "1  e19da596276511edb731acde480011224  device_e   5\n",
      "2  e19da596276511edb731acde480011225  device_f   6\n",
      "3  e19da596276511edb731acde480011226  device_g   7\n",
      "4  e19da596276511edb731acde480011227  device_h   8\n",
      "5  e19da596276511edb731acde480011228  device_i   9\n",
      "6  e19da596276511edb731acde480011229  device_j  10\n"
     ]
    }
   ],
   "source": [
    "print(pipeline.get_component(\"intersect_0\").get_output_data())"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "e0c9f78c",
   "metadata": {},
   "source": [
    "For more examples on using pipeline to submit jobs, please refer to [pipeline demos](https://github.com/FederatedAI/FATE/tree/master/examples/pipeline/demo)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.13"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
