跳转至

开发指南

为 FATE 开发可运行的算法模块

本文档描述了如何开发算法模块,使得该模块可以在 FATE 架构下被调用。

要开发模块,需要执行以下 6 个步骤。

  1. 定义将在此模块中使用的 python 参数对象。
  2. 定义模块的 meta 文件。
  3. 如果模块需要联邦,则需定义传输变量配置文件。
  4. 您的算法模块需要继承model_base类,并完成几个指定的函数。
  5. 定义模型保存所需的protobuf文件。
  6. 若希望通过python脚本直接启动组件,需要在fate_client中定义Pipeline组件。
  7. 重启fate flow服务。

在以下各节中,我们将通过 hetero_lr 详细描述这 7 个步骤。

第一步:定义此模块将使用的参数对象

参数对象是将用户定义的运行时参数传递给开发模块的唯一方法,因此每个模块都有其自己的参数对象。

为定义可用的参数对象,需要三个步骤。

  1. 打开一个新的 python 文件,将其重命名为 xxx_param.py,其中xxx代表您模块的名称,并将其放置在 python/federatedml/param/ 文件夹中。 在 xxx_param.py 中定义它的类对象,应该继承 python/federatedml/param/base_param.py 中定义的 BaseParam 类。
  2. 参数类的 __init__ 方法应该指定模块使用的所有参数。
  3. 重载 BaseParam 的参数检查接口,否则将会抛出未实现的错误。检查方法被用于验证参数变量是否可用。

以 hetero lr 的参数对象为例,python文件为 federatedml/param/logistic_regression_param.py

首先,它继承自 BaseParam:

class LogisticParam(BaseParam):

然后,在 __init__ 方法中定义所有参数变量:

def __init__(self, penalty='L2',
                 tol=1e-4, alpha=1.0, optimizer='rmsprop',
                 batch_size=-1, learning_rate=0.01, init_param=InitParam(),
                 max_iter=100, early_stop='diff', encrypt_param=EncryptParam(),
                 predict_param=PredictParam(), cv_param=CrossValidationParam(),
                 decay=1, decay_sqrt=True,
                 multi_class='ovr', validation_freqs=None, early_stopping_rounds=None,
                 stepwise_param=StepwiseParam(), floating_point_precision=23,
                 metrics=None,
                 use_first_metric_only=False,
                 callback_param=CallbackParam()
                 ):
        super(LogisticParam, self).__init__()
        self.penalty = penalty
        self.tol = tol
        self.alpha = alpha
        self.optimizer = optimizer
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.init_param = copy.deepcopy(init_param)
        self.max_iter = max_iter
        self.early_stop = early_stop
        self.encrypt_param = encrypt_param
        self.predict_param = copy.deepcopy(predict_param)
        self.cv_param = copy.deepcopy(cv_param)
        self.decay = decay
        self.decay_sqrt = decay_sqrt
        self.multi_class = multi_class
        self.validation_freqs = validation_freqs
        self.stepwise_param = copy.deepcopy(stepwise_param)
        self.early_stopping_rounds = early_stopping_rounds
        self.metrics = metrics or []
        self.use_first_metric_only = use_first_metric_only
        self.floating_point_precision = floating_point_precision
        self.callback_param = copy.deepcopy(callback_param)

如上面的示例所示,该参数也可以是 Param 类。此类参数的默认设置是此类的一个实例。然后将该实例的深度复制(deepcopy)版本分配给类归属。深度复制功能用于避免任务同时运行时指向相同内存的风险。

一旦正确定义了类,已有的参数解析器就可以递归地解析每个属性的值。

之后,重载参数检查的接口:

def check(self):
    descr = "logistic_param's"

    if type(self.penalty).__name__ != "str":
        raise ValueError(
            "logistic_param's penalty {} not supported, should be str type".format(self.penalty))
    else:
        self.penalty = self.penalty.upper()
        if self.penalty not in ['L1', 'L2', 'NONE']:
            raise ValueError(
                "logistic_param's penalty not supported, penalty should be 'L1', 'L2' or 'none'")

    if type(self.eps).__name__ != "float":
        raise ValueError(
            "logistic_param's eps {} not supported, should be float type".format(self.eps))

第二步:定义新模块的meta文件

定义meta文件是为了使 FATE-Flow 模块通过该文件以获取有关如何启动模块程序的信息。

  1. components 中定义名为 xxx.py 的meta文件,其中 xxx 是您要开发的模块。

  2. 配置 meta 文件。

    • 继承 ComponentMeta, 用模块名为其命名, 例如 xxx_cpn_meta = ComponentMeta("XXX"). XXX 即在 dsl 中调用的模块名。

          from .components import ComponentMeta
          hetero_lr_cpn_meta = ComponentMeta("HeteroLR")
      

    • 使用装饰器 xxx_cpn_meta.bind_runner.on_$role将模块object绑定至每个角色。 $role 包括 guest\host\arbiter. 如果多个角色使用同一模块object,可以使用 xxx_cpn_meta.bind_runner.on_$role1.on_$role2.on_$role3 格式注明。 装饰器方程将引入并返回对应角色的模块object。

      以hetero-lr 为例: python/federatedml/components/hetero_lr.py

          @hetero_lr_cpn_meta.bind_runner.on_guest
          def hetero_lr_runner_guest():
              from federatedml.linear_model.coordinated_linear_model.logistic_regression import HeteroLRGuest
      
              return HeteroLRGuest
      
          @hetero_lr_cpn_meta.bind_runner.on_host
          def hetero_lr_runner_host():
              from federatedml.linear_model.coordinated_linear_model.logistic_regression import HeteroLRHost
      
              return HeteroLRHost
      
    • 使用装饰器 xxx_cpn_meta.bind_param 将参数object绑定至step1中定义的开发组件, 装饰器将返回对应参数object。

          @hetero_lr_cpn_meta.bind_param
          def hetero_lr_param():
              from federatedml.param.logistic_regression_param import HeteroLogisticParam
      
              return HeteroLogisticParam
      

第三步:定义此模块的传递变量py文件并生成传递变量对象(可选)

仅在此模块需要联邦时(即不同参与方之间存在信息交互)才需要执行此步骤。

Note

应将其放在 transfer_class 文件夹中。

在该定义文件中,您需要创建需要的 transfer_variable类,并继承BaseTransferVariables类,然后定义相应的变量,并为其赋予需要的传输权限。以HeteroLRTransferVariable为例,可以参考以下代码:

from federatedml.transfer_variable.base_transfer_variable import BaseTransferVariables


# noinspection PyAttributeOutsideInit
class HeteroLRTransferVariable(BaseTransferVariables):
    def __init__(self, flowid=0):
        super().__init__(flowid)
        self.batch_data_index = self._create_variable(name='batch_data_index', src=['guest'], dst=['host'])
        self.batch_info = self._create_variable(name='batch_info', src=['guest'], dst=['host', 'arbiter'])
        self.converge_flag = self._create_variable(name='converge_flag', src=['arbiter'], dst=['host', 'guest'])
        self.fore_gradient = self._create_variable(name='fore_gradient', src=['guest'], dst=['host'])
        self.forward_hess = self._create_variable(name='forward_hess', src=['guest'], dst=['host'])
        self.guest_gradient = self._create_variable(name='guest_gradient', src=['guest'], dst=['arbiter'])
        self.guest_hess_vector = self._create_variable(name='guest_hess_vector', src=['guest'], dst=['arbiter'])
        self.guest_optim_gradient = self._create_variable(name='guest_optim_gradient', src=['arbiter'], dst=['guest'])
        self.host_forward_dict = self._create_variable(name='host_forward_dict', src=['host'], dst=['guest'])
        self.host_gradient = self._create_variable(name='host_gradient', src=['host'], dst=['arbiter'])
        self.host_hess_vector = self._create_variable(name='host_hess_vector', src=['host'], dst=['arbiter'])
        self.host_loss_regular = self._create_variable(name='host_loss_regular', src=['host'], dst=['guest'])
        self.host_optim_gradient = self._create_variable(name='host_optim_gradient', src=['arbiter'], dst=['host'])
        self.host_prob = self._create_variable(name='host_prob', src=['host'], dst=['guest'])
        self.host_sqn_forwards = self._create_variable(name='host_sqn_forwards', src=['host'], dst=['guest'])
        self.loss = self._create_variable(name='loss', src=['guest'], dst=['arbiter'])
        self.loss_intermediate = self._create_variable(name='loss_intermediate', src=['host'], dst=['guest'])
        self.paillier_pubkey = self._create_variable(name='paillier_pubkey', src=['arbiter'], dst=['host', 'guest'])
        self.sqn_sample_index = self._create_variable(name='sqn_sample_index', src=['guest'], dst=['host'])
        self.use_async = self._create_variable(name='use_async', src=['guest'], dst=['host'])

其中,需要设定的属性为:

  • name
    变量名

  • src
    应为 "guest","host","arbiter" 的某些组合,它表示发送交互信息从何处发出。

  • dst
    应为 "guest","host","arbiter" 的某些组合列表,用于定义将交互信息发送到何处。

第四步:定义您的模块(应继承 model_base

fate_flow_client 模块的运行规则是:

  1. 从数据库中检索fate的组件注册信息,获取component的每个role对应的运行对象。
  2. 初始化各方的运行对象。
  3. 调用运行对象的 run 方法。
  4. 如果需要,调用 save_data 方法。
  5. 如果需要,调用 export_model 方法。

在本节中,我们讲解如何执行规则 2 至 5。需要被继承的model_base类位于:python/federatedml/model_base.py。 - 重载 __init__ 接口
指定模块参数类型为第一步中定义的类.
hetero_lr_base.py 为例, 最后一行代码指定了新定义的模块的参数类型。

  ```python
    def __init__(self):
    super().__init__()
    self.model_name = 'HeteroLogisticRegression'
    self.model_param_name = 'HeteroLogisticRegressionParam'
    self.model_meta_name = 'HeteroLogisticRegressionMeta'
    self.mode = consts.HETERO
    self.aggregator = None
    self.cipher = None
    self.batch_generator = None
    self.gradient_loss_operator = None
    self.converge_procedure = None
    self.model_param = HeteroLogisticParam()
  ```
注: 这一步是强制的. 如果你不指定 `self.model_param`的值, 在 `_init_model(self, params)`方法中将不能获取params的值.
  • 在需要时重载 fit 接口

    fit 函数具有以下形式。

    def fit(self, train_data, validate_data=None):
    

    fit函数是启动建模组件的训练,或者特征工程组件的fit功能的入口。接受训练数据和验证集数据,validate数据可不提供。该函数在用户启动训练任务时,被model_base自动调起,您只需在该函数完成自身需要的fit任务即可。

  • 在需要的时候重载 predict 接口

    predict 函数具有如下形式.

    def predict(self, data_inst):
    

    data_inst 是一个 Table, 用于建模组件的预测功能。在用户启动预测任务时,将被model_base自动调起。 另外,在训练任务中,建模组件也会调用predict函数对训练数据和验证集数据(如果有)进行预测,并输出预测结果。该函数的返回结果,如果后续希望接入evaluation,需要输出符合下列格式的Table:

    • 二分类,多分类,回归任务返回一张表 表的格式为: ["label", "predict_result", "predict_score", "predict_detail", "type"]

      - `label`: 提供的标签
      - predict_result: 模型预测的结果
      - `predict_score`: 对于2分类为1的预测分数,对于多分类为概率最高的那一类的分数,对于回归任务,与predict\_result相同
      - `predict_detail`: 对于分类任务,列出各分类的得分,对于回归任务,列出回归预测值
      - `type`: 表明该结果来源(是训练数据或者是验证及数据),该结果`model_base`会自动拼接。
      
    • 聚类任务返回两张表

      第一张的格式为: ["cluster_sample_count", "cluster_inner_dist", "inter_cluster_dist"]
      
      - `cluster_sample_count`: 每个类别下的样本个数
      - `cluster_inner_dist`: 类内距离
      - `inter_cluster_dist`: 类间距离
      
      第二张表的格式为: `["predicted_cluster_index", "distance"]`
      
      - `predicted_cluster_index`: 预测的所属类别
      - `distance`: 该样本到中心点的距离
      
  • 在需要的时候重载 transform 接口

    transform 函数具有如下形式.

    def transform(self, data_inst):
    

    data_inst 是一个 Table, 用于特征工程组件对数据进行转化功能。在用户启动预测任务时,将被model_base自动调起。

  • 定义您的 save_data 接口
    以便 fate-flow 可以在需要时通过它获取输出数据。

    def save_data(self):
        return self.data_output
    

第五步: 定义模型保存所需的protobuf

定义模型保存所需的protobuf文件:

为了方便模型跨平台保存和读取模型,FATE使用protobuf文件定义每个模型所需的参数和模型内容。当您开发自己的模块时,需要定义本模块中需要保存的内容并创建相应的protobuf文件。protobuf文件所在的位置为 这个目录

更多使用protobuf的细节,请参考 这个教程

每个模型一般需要两个proto文件,其中后缀为meta的文件中保存某一次任务的配置,后缀为param的文件中保存某次任务的模型结果。

在完成proto文件的定义后,可执行protobuf目录下的 generate_py.sh文件 生成对应的python文件。之后,您可在自己的项目中引用自己设计的proto类型,并进行保存:

bash proto_generate.sh

定义 export_model 接口

以便 fate-flow 可以在需要时通过它获取输出的模型。应为同时包含 “Meta” 和 “Param” 包含了产生的proto buffer类的 dict 格式。这里展示了如何导出模型。

def export_model(self):
    meta_obj = self._get_meta()
    param_obj = self._get_param()
    result = {
        self.model_meta_name: meta_obj,
        self.model_param_name: param_obj
    }
    return result

第六步:开发Pipeline组件

若希望后续用户可以通过python脚本形式启动建模任务,需要在 python/fate_client/pipeline/component 中添加自己的组件。详情请参考Pipeline的 文档

第七步:重启fate flow服务

当上面的开发步骤都完成后,需要重启fate flow服务,否则后续提交任务可能会报一些错误如"新组件的provider找不到"。 fate flow服务也可通过debug模式启动,启动方式: "python fate_flow_server.py --debug", debug模式可以让修改的代码不重启也生效。

开始建模任务

这里给出开发完成后,启动建模任务的一个简单示例。

第一步: 上传数据

在开始任务之前,您需要加载来自所有提供者的数据。为此,需要准备load_file 配置,然后运行以下命令:

flow data upload -c upload_data.json

Note

每个数据提供节点(即guesthost)都需要执行此步骤。

第二步: 开始建模任务

在此步骤中,应准备两个与 dsl 配置文件和组件配置文件相对应的配置文件。请确保配置文件中的 table_namenamespaceupload_data conf 匹配。然后运行以下命令:

flow job submit -d ${your_dsl_file.json} -c ${your_component_conf_json}

若您已在fate_client中添加了自己的组件,也可以准备好自己的pipeline脚本,然后使用python命令直接启动:

python ${your_pipeline.py}

第三步: 检查日志文件

现在,您可以在以下路径中检查日志:$PROJECT_BASE/fateflow/logs/{your jobid}.

有关 dsl 配置文件和参数配置文件的更多详细信息,请参考此处的examples/dsl/v2中查看。


最后更新: 2022-04-13