{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Pipeline Tutorial with HeteroSecureBoost"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### install"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "`Pipeline` is distributed along with [fate_client](https://pypi.org/project/fate-client/).\n",
    "\n",
    "```bash\n",
    "pip install fate_client\n",
    "```\n",
    "\n",
    "To use Pipeline, we need to first specify which `FATE Flow Service` to connect to. Once `fate_client` installed, one can find an cmd enterpoint name `pipeline`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Usage: pipeline [OPTIONS] COMMAND [ARGS]...\n",
      "\n",
      "Options:\n",
      "  --help  Show this message and exit.\n",
      "\n",
      "Commands:\n",
      "  config  pipeline config tool\n",
      "  init    - DESCRIPTION: Pipeline Config Command.\n"
     ]
    }
   ],
   "source": [
    "!pipeline --help"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Assume we have a `FATE Flow Service` in 127.0.0.1:9380(defaults in standalone), then exec"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Pipeline configuration succeeded.\n"
     ]
    }
   ],
   "source": [
    "!pipeline init --ip 127.0.0.1 --port 9380"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Hetero SecureBoost Example"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    " Before start a modeling task, data to be used should be uploaded. Please refer to this [guide](https://github.com/FederatedAI/FATE/blob/master/doc/tutorial/pipeline/pipeline_tutorial_upload.ipynb)."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The `pipeline` package provides components to compose a `FATE pipeline`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pipeline.backend.pipeline import PipeLine\n",
    "from pipeline.component import Reader, DataTransform, Intersection, HeteroSecureBoost, Evaluation\n",
    "from pipeline.interface import Data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Make a `pipeline` instance:\n",
    "\n",
    "    - initiator: \n",
    "        * role: guest\n",
    "        * party: 9999\n",
    "    - roles:\n",
    "        * guest: 9999\n",
    "        * host: 10000\n",
    "    "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline = PipeLine() \\\n",
    "        .set_initiator(role='guest', party_id=9999) \\\n",
    "        .set_roles(guest=9999, host=10000)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Define a `Reader` to load data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "reader_0 = Reader(name=\"reader_0\")\n",
    "# set guest parameter\n",
    "reader_0.get_party_instance(role='guest', party_id=9999).component_param(\n",
    "    table={\"name\": \"breast_hetero_guest\", \"namespace\": \"experiment\"})\n",
    "# set host parameter\n",
    "reader_0.get_party_instance(role='host', party_id=10000).component_param(\n",
    "    table={\"name\": \"breast_hetero_host\", \"namespace\": \"experiment\"})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Add a `DataTransform` component to parse raw data into Data Instance"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "data_transform_0 = DataTransform(name=\"data_transform_0\")\n",
    "# set guest parameter\n",
    "data_transform_0.get_party_instance(role='guest', party_id=9999).component_param(\n",
    "    with_label=True)\n",
    "data_transform_0.get_party_instance(role='host', party_id=[10000]).component_param(\n",
    "    with_label=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Add a `Intersection` component to perform PSI for hetero-scenario"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "intersect_0 = Intersection(name=\"intersect_0\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, we define the `HeteroSecureBoost` component. The following parameters will be set for all parties involved."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "hetero_secureboost_0 = HeteroSecureBoost(name=\"hetero_secureboost_0\",\n",
    "                                         num_trees=5,\n",
    "                                         bin_num=16,\n",
    "                                         task_type=\"classification\",\n",
    "                                         objective_param={\"objective\": \"cross_entropy\"},\n",
    "                                         encrypt_param={\"method\": \"paillier\"},\n",
    "                                         tree_param={\"max_depth\": 3})\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "To show the evaluation result, an \"Evaluation\" component is needed."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "evaluation_0 = Evaluation(name=\"evaluation_0\", eval_type=\"binary\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Add components to pipeline, in order of execution:\n",
    "\n",
    "    - data_transform_0 comsume reader_0's output data\n",
    "    - intersect_0 comsume data_transform_0's output data\n",
    "    - hetero_secureboost_0 consume intersect_0's output data\n",
    "    - evaluation_0 consume hetero_secureboost_0's prediciton result on training data\n",
    "\n",
    "Then compile our pipeline to make it ready for submission."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline.add_component(reader_0)\n",
    "pipeline.add_component(data_transform_0, data=Data(data=reader_0.output.data))\n",
    "pipeline.add_component(intersect_0, data=Data(data=data_transform_0.output.data))\n",
    "pipeline.add_component(hetero_secureboost_0, data=Data(train_data=intersect_0.output.data))\n",
    "pipeline.add_component(evaluation_0, data=Data(data=hetero_secureboost_0.output.data))\n",
    "pipeline.compile();\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, submit(fit) our pipeline:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[32m2021-12-31 03:24:22.633\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m123\u001b[0m - \u001b[1mJob id is 202112310324182459270\n",
      "\u001b[0m\n",
      "\u001b[32m2021-12-31 03:24:23.152\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m144\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KJob is still waiting, time elapse: 0:00:00\u001b[0m\n",
      "\u001b[0mm2021-12-31 03:24:23.671\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n",
      "\u001b[32m2021-12-31 03:24:27.861\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component reader_0, time elapse: 0:00:05\u001b[0m\n",
      "\u001b[0mm2021-12-31 03:24:30.533\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n",
      "\u001b[32m2021-12-31 03:24:34.732\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component data_transform_0, time elapse: 0:00:12\u001b[0m\n",
      "\u001b[0mm2021-12-31 03:24:37.333\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n",
      "\u001b[32m2021-12-31 03:24:43.185\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:20\u001b[0m\n",
      "\u001b[0mm2021-12-31 03:24:46.315\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n",
      "\u001b[32m2021-12-31 03:25:18.915\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component hetero_secureboost_0, time elapse: 0:00:56\u001b[0m\n",
      "\u001b[0mm2021-12-31 03:25:22.043\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n",
      "\u001b[32m2021-12-31 03:25:27.867\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component evaluation_0, time elapse: 0:01:05\u001b[0m\n",
      "\u001b[32m2021-12-31 03:25:31.005\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m131\u001b[0m - \u001b[1mJob is success!!! Job id is 202112310324182459270\u001b[0m\n",
      "\u001b[32m2021-12-31 03:25:31.007\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m132\u001b[0m - \u001b[1mTotal time: 0:01:08\u001b[0m\n"
     ]
    }
   ],
   "source": [
    "pipeline.fit()\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Once training is done, trained model may be used for prediction. Optionally, save the trained pipeline for future use."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline.dump(\"pipeline_saved.pkl\");"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "First, deploy needed components from train pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline = PipeLine.load_model_from_file('pipeline_saved.pkl')\n",
    "pipeline.deploy_component([pipeline.data_transform_0, pipeline.intersect_0, pipeline.hetero_secureboost_0]);"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Define new `Reader` components for reading prediction data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [],
   "source": [
    "reader_1 = Reader(name=\"reader_1\")\n",
    "reader_1.get_party_instance(role=\"guest\", party_id=9999).component_param(table={\"name\": \"breast_hetero_guest\", \"namespace\": \"experiment\"})\n",
    "reader_1.get_party_instance(role=\"host\", party_id=10000).component_param(table={\"name\": \"breast_hetero_host\", \"namespace\": \"experiment\"})"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Optionally, define new `Evaluation` component."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "evaluation_0 = Evaluation(name=\"evaluation_0\", eval_type=\"binary\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Add components to predict pipeline in order of execution:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [],
   "source": [
    "predict_pipeline = PipeLine()\n",
    "predict_pipeline.add_component(reader_1)\\\n",
    "                .add_component(pipeline, \n",
    "                               data=Data(predict_input={pipeline.data_transform_0.input.data: reader_1.output.data}))\\\n",
    "                .add_component(evaluation_0, data=Data(data=pipeline.hetero_secureboost_0.output.data));\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Then, run prediction job"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[32m2021-12-31 03:25:35.541\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m123\u001b[0m - \u001b[1mJob id is 202112310325328444510\n",
      "\u001b[0m\n",
      "\u001b[32m2021-12-31 03:25:47.384\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m144\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KJob is still waiting, time elapse: 0:00:11\u001b[0m\n",
      "\u001b[0mm2021-12-31 03:25:47.903\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n",
      "\u001b[32m2021-12-31 03:25:52.078\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component reader_1, time elapse: 0:00:16\u001b[0m\n",
      "\u001b[0mm2021-12-31 03:25:54.161\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n",
      "\u001b[32m2021-12-31 03:25:58.545\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component data_transform_0, time elapse: 0:00:23\u001b[0m\n",
      "\u001b[0mm2021-12-31 03:26:01.167\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n",
      "\u001b[32m2021-12-31 03:26:07.502\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component intersect_0, time elapse: 0:00:31\u001b[0m\n",
      "\u001b[0mm2021-12-31 03:26:10.137\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n",
      "\u001b[32m2021-12-31 03:26:18.580\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component hetero_secureboost_0, time elapse: 0:00:43\u001b[0m\n",
      "\u001b[0mm2021-12-31 03:26:21.245\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m173\u001b[0m - \u001b[1m\n",
      "\u001b[32m2021-12-31 03:26:25.480\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component evaluation_0, time elapse: 0:00:49\u001b[0m\n",
      "\u001b[32m2021-12-31 03:26:28.136\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m131\u001b[0m - \u001b[1mJob is success!!! Job id is 202112310325328444510\u001b[0m\n",
      "\u001b[32m2021-12-31 03:26:28.143\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m132\u001b[0m - \u001b[1mTotal time: 0:00:52\u001b[0m\n"
     ]
    }
   ],
   "source": [
    "predict_pipeline.predict()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For more demo on using pipeline to submit jobs, please refer to [pipeline demos](https://github.com/FederatedAI/FATE/tree/master/examples/pipeline/demo)"
   ]
  }
 ],
 "metadata": {
  "interpreter": {
   "hash": "ad4309918fa4cd1705b305e369b2f64d901b1851e9144aef7b9b07ea3efcb1bb"
  },
  "kernelspec": {
   "display_name": "Python 3.6.15 64-bit ('py36': venv)",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.15"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
