{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Pipeline Upload Data Tutorial "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### install"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "`Pipeline` is distributed along with [fate_client](https://pypi.org/project/fate-client/).\n",
    "\n",
    "```bash\n",
    "pip install fate_client\n",
    "```\n",
    "\n",
    "To use Pipeline, we need to first specify which `FATE Flow Service` to connect to. Once `fate_client` installed, one can find an cmd enterpoint name `pipeline`:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Usage: pipeline [OPTIONS] COMMAND [ARGS]...\n",
      "\n",
      "Options:\n",
      "  --help  Show this message and exit.\n",
      "\n",
      "Commands:\n",
      "  config  pipeline config tool\n",
      "  init    - DESCRIPTION: Pipeline Config Command.\n"
     ]
    }
   ],
   "source": [
    "!pipeline --help"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Assume we have a `FATE Flow Service` in 127.0.0.1:9380(defaults in standalone), then exec"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Pipeline configuration succeeded.\n"
     ]
    }
   ],
   "source": [
    "!pipeline init --ip 127.0.0.1 --port 9380"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### upload data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    " Before start a modeling task, the data to be used should be uploaded. \n",
    " Typically, a party is usually a cluster which include multiple nodes. Thus, when we upload these data, the data will be allocated to those nodes."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pipeline.backend.pipeline import PipeLine"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Make a `pipeline` instance:\n",
    "\n",
    "    - initiator: \n",
    "        * role: guest\n",
    "        * party: 9999\n",
    "    - roles:\n",
    "        * guest: 9999\n",
    "\n",
    "note that only local party id is needed.\n",
    "    "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline_upload = PipeLine().set_initiator(role='guest', party_id=9999).set_roles(guest=9999)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Define partitions for data storage"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "partition = 4"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Define table name and namespace, which will be used in FATE job configuration"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "dense_data_guest = {\"name\": \"breast_hetero_guest\", \"namespace\": f\"experiment\"}\n",
    "dense_data_host = {\"name\": \"breast_hetero_host\", \"namespace\": f\"experiment\"}\n",
    "tag_data = {\"name\": \"breast_hetero_host\", \"namespace\": f\"experiment\"}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now, we add data to be uploaded"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "\n",
    "data_base = \"/workspace/FATE/\"\n",
    "pipeline_upload.add_upload_data(file=os.path.join(data_base, \"examples/data/breast_hetero_guest.csv\"),\n",
    "                                table_name=dense_data_guest[\"name\"],             # table name\n",
    "                                namespace=dense_data_guest[\"namespace\"],         # namespace\n",
    "                                head=1, partition=partition)               # data info\n",
    "\n",
    "pipeline_upload.add_upload_data(file=os.path.join(data_base, \"examples/data/breast_hetero_host.csv\"),\n",
    "                                table_name=dense_data_host[\"name\"],\n",
    "                                namespace=dense_data_host[\"namespace\"],\n",
    "                                head=1, partition=partition)\n",
    "\n",
    "pipeline_upload.add_upload_data(file=os.path.join(data_base, \"examples/data/breast_hetero_host.csv\"),\n",
    "                                table_name=tag_data[\"name\"],\n",
    "                                namespace=tag_data[\"namespace\"],\n",
    "                                head=1, partition=partition)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We can then upload data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      " UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[32m2021-11-15 11:26:40.541\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m123\u001b[0m - \u001b[1mJob id is 202111151126388669570\n",
      "\u001b[0m\n",
      "\u001b[32m2021-11-15 11:26:42.601\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m144\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KJob is still waiting, time elapse: 0:00:02\u001b[0m\n",
      "\u001b[0m\n",
      "\u001b[32m2021-11-15 11:26:49.260\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component upload_0, time elapse: 0:00:08\u001b[0m\n",
      "\u001b[32m2021-11-15 11:26:52.644\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m131\u001b[0m - \u001b[1mJob is success!!! Job id is 202111151126388669570\u001b[0m\n",
      "\u001b[32m2021-11-15 11:26:52.649\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m132\u001b[0m - \u001b[1mTotal time: 0:00:12\u001b[0m\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      " UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[32m2021-11-15 11:26:53.998\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m123\u001b[0m - \u001b[1mJob id is 202111151126526555290\n",
      "\u001b[0m\n",
      "\u001b[32m2021-11-15 11:26:54.006\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m144\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KJob is still waiting, time elapse: 0:00:00\u001b[0m\n",
      "\u001b[0m\n",
      "\u001b[32m2021-11-15 11:27:00.984\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component upload_0, time elapse: 0:00:06\u001b[0m\n",
      "\u001b[32m2021-11-15 11:27:02.770\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m131\u001b[0m - \u001b[1mJob is success!!! Job id is 202111151126526555290\u001b[0m\n",
      "\u001b[32m2021-11-15 11:27:02.771\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m132\u001b[0m - \u001b[1mTotal time: 0:00:08\u001b[0m\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      " UPLOADING:||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||100.00%\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "\u001b[32m2021-11-15 11:27:03.959\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m123\u001b[0m - \u001b[1mJob id is 202111151127027776050\n",
      "\u001b[0m\n",
      "\u001b[32m2021-11-15 11:27:03.969\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m144\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KJob is still waiting, time elapse: 0:00:00\u001b[0m\n",
      "\u001b[0m\n",
      "\u001b[32m2021-11-15 11:27:11.018\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m177\u001b[0m - \u001b[1m\u001b[80D\u001b[1A\u001b[KRunning component upload_0, time elapse: 0:00:07\u001b[0m\n",
      "\u001b[32m2021-11-15 11:27:12.669\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m131\u001b[0m - \u001b[1mJob is success!!! Job id is 202111151127027776050\u001b[0m\n",
      "\u001b[32m2021-11-15 11:27:12.671\u001b[0m | \u001b[1mINFO    \u001b[0m | \u001b[36mpipeline.utils.invoker.job_submitter\u001b[0m:\u001b[36mmonitor_job_status\u001b[0m:\u001b[36m132\u001b[0m - \u001b[1mTotal time: 0:00:08\u001b[0m\n"
     ]
    }
   ],
   "source": [
    "pipeline_upload.upload(drop=1)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "For more demo on using pipeline to submit jobs, please refer to [pipeline demos](https://github.com/FederatedAI/FATE/tree/master/examples/pipeline/demo)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.15"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
