Union¶
Union module combines given tables into one while keeping unique entry ids. Union is a local module. Like DataTransform, this module can be run on the side of Host or Guest, and running this module does not require any interaction with outside parties.
Use¶
Union currently only supports joining by entry id. For tables of data instances, their header, idx and label column name (if label exists) should match.
When an id appears more than once in the joining tables, user can specify whether to keep the duplicated instances by setting parameter keep_duplicate to True. Otherwise, only the entry from its first appearance will be kept in the final combined table. Note that the order by which tables being fed into Union module depends on the job setting. As shown below:
with FATE-Pipeline:
``` sourceCode python { "union_0": { "module": "Union", "input": { "data": { "data": ["data_transform_0.data", "data_transform_1.data", "data_transform_2.data"] } }, "output": { "data": ["data"] } } }
with DSL v2:
``` sourceCode json
{
"union_0": {
"module": "Union",
"input": {
"data": {
"data": ["data_transform_0.data", "data_transform_1.data", "data_transform_2.data"]
}
},
"output": {
"data": ["data"]
}
}
}
Upstream tables will enter Union module in this order: data_transform_0.data, data_transform_1.data, data_transform_2.data .
If an id 42 exists in both data_transform_0.data and data_transform_1.data, and:
- 'keep_duplicate` set to false: the value from data_transform_0.data is the one being kept in the final result, its id unchanged.
- 'keep_duplicate` set to true: the value from data_transform_0.data and the one from data_transform_1.data are both kept; the id in data_transform_0.data will be transformed to 42_data_transform_0, and the id in data_transform_1.data to 42_data_transform_1.
Param¶
union_param
¶
Attributes¶
Classes¶
UnionParam(need_run=True, allow_missing=False, keep_duplicate=False)
¶
Bases: BaseParam
Define the union method for combining multiple dTables and keep entries with the same id
Parameters:
Name | Type | Description | Default |
---|---|---|---|
need_run |
Indicate if this module needed to be run |
True
|
|
allow_missing |
Whether allow mismatch between feature length and header length in the result. Note that empty tables will always be skipped regardless of this param setting. |
False
|
|
keep_duplicate |
Whether to keep entries with duplicated keys. If set to True, a new id will be generated for duplicated entry in the format {id}_{table_name}. |
False
|
Source code in federatedml/param/union_param.py
38 39 40 41 42 |
|
Attributes¶
need_run = need_run
instance-attribute
¶allow_missing = allow_missing
instance-attribute
¶keep_duplicate = keep_duplicate
instance-attribute
¶Functions¶
check()
¶Source code in federatedml/param/union_param.py
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
|
Examples¶
Example
## Union Pipeline Example Usage Guide.
#### Example Tasks
This section introduces the Pipeline scripts for different types of tasks.
1. Unilateral Union Task:
script: pipeline-union-basic.py
2. Union Task on Both Guest & Host Sides:
script: pipeline-union.py
3. Union Task on Table:
script: pipeline-union-data-transform.py
4. Union Task on TagValue Table (with duplicated ids):
script: pipeline-union-tag-value.py
Users can run a pipeline job directly:
python ${pipeline_script}
pipeline-union.py
import argparse
from pipeline.backend.pipeline import PipeLine
from pipeline.component import DataTransform
from pipeline.component import Evaluation
from pipeline.component import HeteroLR
from pipeline.component import Intersection
from pipeline.component import Reader
from pipeline.component import Union
from pipeline.interface import Data
from pipeline.interface import Model
from pipeline.utils.tools import load_job_config
def main(config="../../config.yaml", namespace=""):
# obtain config
if isinstance(config, str):
config = load_job_config(config)
parties = config.parties
guest = parties.guest[0]
host = parties.host[0]
arbiter = parties.arbiter[0]
guest_train_data = {"name": "breast_hetero_guest", "namespace": f"experiment{namespace}"}
host_train_data = {"name": "breast_hetero_host", "namespace": f"experiment{namespace}"}
pipeline = PipeLine().set_initiator(role='guest', party_id=guest).set_roles(guest=guest, host=host, arbiter=arbiter)
reader_0 = Reader(name="reader_0")
reader_0.get_party_instance(role='guest', party_id=guest).component_param(table=guest_train_data)
reader_0.get_party_instance(role='host', party_id=host).component_param(table=host_train_data)
reader_1 = Reader(name="reader_1")
reader_1.get_party_instance(role='guest', party_id=guest).component_param(table=guest_train_data)
reader_1.get_party_instance(role='host', party_id=host).component_param(table=host_train_data)
data_transform_0 = DataTransform(name="data_transform_0")
data_transform_1 = DataTransform(name="data_transform_1")
data_transform_0.get_party_instance(
role='guest', party_id=guest).component_param(
with_label=True, output_format="dense")
data_transform_0.get_party_instance(role='host', party_id=host).component_param(with_label=False)
data_transform_1.get_party_instance(
role='guest', party_id=guest).component_param(
with_label=True, output_format="dense")
data_transform_1.get_party_instance(role='host', party_id=host).component_param(with_label=False)
intersect_0 = Intersection(name="intersection_0")
intersect_1 = Intersection(name="intersection_1")
union_0 = Union(name="union_0")
hetero_lr_0 = HeteroLR(name="hetero_lr_0", max_iter=3, early_stop="weight_diff",
optimizer="nesterov_momentum_sgd", tol=1E-4, alpha=0.01,
learning_rate=0.15, init_param={"init_method": "random_uniform"})
evaluation_0 = Evaluation(name="evaluation_0", eval_type="binary", pos_label=1)
evaluation_0.get_party_instance(role='host', party_id=host).component_param(need_run=False)
pipeline.add_component(reader_0)
pipeline.add_component(reader_1)
pipeline.add_component(data_transform_0, data=Data(data=reader_0.output.data))
pipeline.add_component(
data_transform_1, data=Data(
data=reader_1.output.data), model=Model(
data_transform_0.output.model))
pipeline.add_component(intersect_0, data=Data(data=data_transform_0.output.data))
pipeline.add_component(intersect_1, data=Data(data=data_transform_1.output.data))
pipeline.add_component(union_0, data=Data(data=[intersect_0.output.data, intersect_1.output.data]))
pipeline.add_component(hetero_lr_0, data=Data(train_data=union_0.output.data))
pipeline.add_component(evaluation_0, data=Data(data=hetero_lr_0.output.data))
pipeline.compile()
pipeline.fit()
if __name__ == "__main__":
parser = argparse.ArgumentParser("PIPELINE DEMO")
parser.add_argument("-config", type=str,
help="config file")
args = parser.parse_args()
if args.config is not None:
main(args.config)
else:
main()
pipeline-union-data-transform.py
import argparse
from pipeline.backend.pipeline import PipeLine
from pipeline.component import DataTransform
from pipeline.component import Reader
from pipeline.component import Union
from pipeline.interface import Data
from pipeline.utils.tools import load_job_config
def main(config="../../config.yaml", namespace=""):
# obtain config
if isinstance(config, str):
config = load_job_config(config)
parties = config.parties
guest = parties.guest[0]
guest_train_data = [{"name": "breast_hetero_guest", "namespace": f"experiment{namespace}"},
{"name": "breast_hetero_guest", "namespace": f"experiment{namespace}"}]
pipeline = PipeLine().set_initiator(role='guest', party_id=guest).set_roles(guest=guest)
reader_0 = Reader(name="reader_0")
reader_0.get_party_instance(role='guest', party_id=guest).component_param(table=guest_train_data[0])
reader_1 = Reader(name="reader_1")
reader_1.get_party_instance(role='guest', party_id=guest).component_param(table=guest_train_data[1])
union_0 = Union(name="union_0", allow_missing=False, keep_duplicate=True)
data_transform_0 = DataTransform(name="data_transform_0", with_label=True, output_format="dense", label_name="y",
missing_fill=False, outlier_replace=False)
pipeline.add_component(reader_0)
pipeline.add_component(reader_1)
pipeline.add_component(union_0, data=Data(data=[reader_0.output.data, reader_1.output.data]))
pipeline.add_component(data_transform_0, data=Data(data=union_0.output.data))
pipeline.compile()
pipeline.fit()
if __name__ == "__main__":
parser = argparse.ArgumentParser("PIPELINE DEMO")
parser.add_argument("-config", type=str,
help="config file")
args = parser.parse_args()
if args.config is not None:
main(args.config)
else:
main()
pipeline-union-tag-value.py
import argparse
from pipeline.backend.pipeline import PipeLine
from pipeline.component import DataTransform
from pipeline.component import Reader
from pipeline.component import Union
from pipeline.interface import Data
from pipeline.utils.tools import load_job_config
def main(config="../../config.yaml", namespace=""):
# obtain config
if isinstance(config, str):
config = load_job_config(config)
parties = config.parties
guest = parties.guest[0]
guest_train_data = [{"name": "tag_value_1", "namespace": f"experiment{namespace}"},
{"name": "tag_value_2", "namespace": f"experiment{namespace}"},
{"name": "tag_value_3", "namespace": f"experiment{namespace}"}]
pipeline = PipeLine().set_initiator(role='guest', party_id=guest).set_roles(guest=guest)
reader_0 = Reader(name="reader_0")
reader_0.get_party_instance(role='guest', party_id=guest).component_param(table=guest_train_data[0])
reader_1 = Reader(name="reader_1")
reader_1.get_party_instance(role='guest', party_id=guest).component_param(table=guest_train_data[1])
reader_2 = Reader(name="reader_2")
reader_2.get_party_instance(role='guest', party_id=guest).component_param(table=guest_train_data[2])
union_0 = Union(name="union_0", allow_missing=False, keep_duplicate=True, need_run=True)
data_transform_0 = DataTransform(name="data_transform_0", input_format="tag", with_label=False, tag_with_value=True,
delimitor=",", output_format="dense")
pipeline.add_component(reader_0)
pipeline.add_component(reader_1)
pipeline.add_component(reader_2)
pipeline.add_component(union_0, data=Data(data=[reader_0.output.data, reader_1.output.data, reader_2.output.data]))
pipeline.add_component(data_transform_0, data=Data(data=union_0.output.data))
pipeline.compile()
pipeline.fit()
if __name__ == "__main__":
parser = argparse.ArgumentParser("PIPELINE DEMO")
parser.add_argument("-config", type=str,
help="config file")
args = parser.parse_args()
if args.config is not None:
main(args.config)
else:
main()
pipeline-union-basic.py
import argparse
from pipeline.backend.pipeline import PipeLine
from pipeline.component import DataTransform
from pipeline.component import Reader
from pipeline.component import Union
from pipeline.interface import Data
from pipeline.interface import Model
from pipeline.utils.tools import load_job_config
def main(config="../../config.yaml", namespace=""):
# obtain config
if isinstance(config, str):
config = load_job_config(config)
parties = config.parties
guest = parties.guest[0]
guest_train_data = {"name": "breast_hetero_guest", "namespace": f"experiment{namespace}"}
pipeline = PipeLine().set_initiator(role='guest', party_id=guest).set_roles(guest=guest)
reader_0 = Reader(name="reader_0")
reader_0.get_party_instance(role='guest', party_id=guest).component_param(table=guest_train_data)
reader_1 = Reader(name="reader_1")
reader_1.get_party_instance(role='guest', party_id=guest).component_param(table=guest_train_data)
data_transform_0 = DataTransform(name="data_transform_0", with_label=True, output_format="dense", label_name="y",
missing_fill=False, outlier_replace=False)
data_transform_1 = DataTransform(name="data_transform_1", with_label=True, output_format="dense", label_name="y",
missing_fill=False, outlier_replace=False)
union_0 = Union(name="union_0", allow_missing=False, need_run=True)
pipeline.add_component(reader_0)
pipeline.add_component(reader_1)
pipeline.add_component(data_transform_0, data=Data(data=reader_0.output.data))
pipeline.add_component(
data_transform_1, data=Data(
data=reader_1.output.data), model=Model(
data_transform_0.output.model))
pipeline.add_component(union_0, data=Data(data=[data_transform_0.output.data, data_transform_1.output.data]))
pipeline.compile()
pipeline.fit()
if __name__ == "__main__":
parser = argparse.ArgumentParser("PIPELINE DEMO")
parser.add_argument("-config", type=str,
help="config file")
args = parser.parse_args()
if args.config is not None:
main(args.config)
else:
main()
init.py
union_testsuite.json
{
"data": [
{
"file": "examples/data/breast_hetero_guest.csv",
"head": 1,
"partition": 16,
"table_name": "breast_hetero_guest",
"namespace": "experiment",
"role": "guest_0"
},
{
"file": "examples/data/breast_hetero_host.csv",
"head": 1,
"partition": 16,
"table_name": "breast_hetero_host",
"namespace": "experiment",
"role": "host_0"
},
{
"file": "examples/data/tag_value_1000_140.csv",
"head": 0,
"partition": 16,
"table_name": "tag_value_1",
"namespace": "experiment",
"role": "guest_0"
},
{
"file": "examples/data/tag_value_1000_140.csv",
"head": 0,
"partition": 16,
"table_name": "tag_value_2",
"namespace": "experiment",
"role": "guest_0"
},
{
"file": "examples/data/tag_value_1000_140.csv",
"head": 0,
"partition": 16,
"table_name": "tag_value_3",
"namespace": "experiment",
"role": "guest_0"
}
],
"pipeline_tasks": {
"union": {
"script": "./pipeline-union.py"
},
"union-basic": {
"script": "./pipeline-union-basic.py"
},
"union-data-transform": {
"script": "./pipeline-union-data-transform.py"
},
"union-tag": {
"script": "./pipeline-union-tag-value.py"
}
}
}
## Union Configuration Usage Guide.
#### Example Tasks
This section introduces the dsl and conf for different types of tasks.
1. Unilateral Union Task:
dsl: test_union_basic_job_dsl.json
runtime_config : test_union_basic_job_conf.json
2. Union Task on Both Guest & Host Sides:
dsl: test_union_job_dsl.json
runtime_config : test_union_job_conf.json
3. Union Task on Table:
dsl: test_union_data_transform_job_dsl.json
runtime_config: test_union_data_transform_job_conf.json
4. Union Task on TagValue Table (with duplicated ids):
dsl: test_union_tag_value_job_dsl.json
runtime_config: test_union_tag_value_job_conf.json
Users can use following commands to run the task.
flow job submit -c ${runtime_config} -d ${dsl}
After having finished a successful training task, you can use FATE Board to check output.
test_union_basic_job_dsl.json
{
"components": {
"reader_0": {
"module": "Reader",
"output": {
"data": [
"data"
]
}
},
"reader_1": {
"module": "Reader",
"output": {
"data": [
"data"
]
}
},
"data_transform_0": {
"module": "DataTransform",
"input": {
"data": {
"data": [
"reader_0.data"
]
}
},
"output": {
"data": [
"data"
],
"model": [
"model"
]
}
},
"data_transform_1": {
"module": "DataTransform",
"input": {
"data": {
"data": [
"reader_1.data"
]
},
"model": [
"data_transform_0.model"
]
},
"output": {
"data": [
"data"
],
"model": [
"model"
]
}
},
"union_0": {
"module": "Union",
"input": {
"data": {
"data": [
"data_transform_0.data",
"data_transform_1.data"
]
}
},
"output": {
"data": [
"data"
]
}
}
}
}
test_union_basic_job_conf.json
{
"dsl_version": 2,
"initiator": {
"role": "guest",
"party_id": 9999
},
"role": {
"guest": [
9999
]
},
"component_parameters": {
"common": {
"data_transform_0": {
"missing_fill": false,
"outlier_replace": false,
"with_label": true,
"label_name": "y",
"output_format": "dense"
},
"data_transform_1": {
"missing_fill": false,
"outlier_replace": false,
"with_label": true,
"label_name": "y",
"output_format": "dense"
},
"union_0": {
"allow_missing": false,
"need_run": true
}
},
"role": {
"guest": {
"0": {
"reader_1": {
"table": {
"name": "breast_hetero_guest",
"namespace": "experiment"
}
},
"reader_0": {
"table": {
"name": "breast_hetero_guest",
"namespace": "experiment"
}
}
}
}
}
}
}
test_union_data_transform_job_dsl.json
{
"components": {
"reader_0": {
"module": "Reader",
"output": {
"data": [
"data"
]
}
},
"reader_1": {
"module": "Reader",
"output": {
"data": [
"data"
]
}
},
"union_0": {
"module": "Union",
"input": {
"data": {
"data": [
"reader_0.data",
"reader_1.data"
]
}
},
"output": {
"data": [
"data"
]
}
},
"data_transform_0": {
"module": "DataTransform",
"input": {
"data": {
"data": [
"union_0.data"
]
}
},
"output": {
"data": [
"data"
],
"model": [
"model"
]
}
}
}
}
test_union_tag_value_job_dsl.json
{
"components": {
"reader_0": {
"module": "Reader",
"output": {
"data": [
"data"
]
}
},
"reader_1": {
"module": "Reader",
"output": {
"data": [
"data"
]
}
},
"reader_2": {
"module": "Reader",
"output": {
"data": [
"data"
]
}
},
"union_0": {
"module": "Union",
"input": {
"data": {
"data": [
"reader_0.data",
"reader_1.data",
"reader_2.data"
]
}
},
"output": {
"data": [
"data"
]
}
},
"data_transform_0": {
"module": "DataTransform",
"input": {
"data": {
"data": [
"union_0.data"
]
}
},
"output": {
"data": [
"data"
],
"model": [
"model"
]
}
}
}
}
test_union_job_conf.json
{
"dsl_version": 2,
"initiator": {
"role": "guest",
"party_id": 9999
},
"role": {
"arbiter": [
10000
],
"host": [
10000
],
"guest": [
9999
]
},
"component_parameters": {
"common": {
"hetero_lr_0": {
"tol": 0.0001,
"alpha": 0.01,
"optimizer": "nesterov_momentum_sgd",
"learning_rate": 0.15,
"init_param": {
"init_method": "random_uniform"
},
"max_iter": 30,
"early_stop": "weight_diff"
},
"evaluation_0": {
"eval_type": "binary",
"pos_label": 1
}
},
"role": {
"guest": {
"0": {
"reader_0": {
"table": {
"name": "breast_hetero_guest",
"namespace": "experiment"
}
},
"data_transform_0": {
"with_label": true,
"output_format": "dense"
},
"data_transform_1": {
"with_label": true,
"output_format": "dense"
},
"reader_1": {
"table": {
"name": "breast_hetero_guest",
"namespace": "experiment"
}
}
}
},
"host": {
"0": {
"reader_0": {
"table": {
"name": "breast_hetero_host",
"namespace": "experiment"
}
},
"data_transform_0": {
"with_label": false
},
"evaluation_0": {
"need_run": false
},
"data_transform_1": {
"with_label": false
},
"reader_1": {
"table": {
"name": "breast_hetero_host",
"namespace": "experiment"
}
}
}
}
}
}
}
test_union_job_dsl.json
{
"components": {
"reader_0": {
"module": "Reader",
"output": {
"data": [
"data"
]
}
},
"reader_1": {
"module": "Reader",
"output": {
"data": [
"data"
]
}
},
"data_transform_0": {
"module": "DataTransform",
"input": {
"data": {
"data": [
"reader_0.data"
]
}
},
"output": {
"data": [
"data"
],
"model": [
"model"
]
}
},
"data_transform_1": {
"module": "DataTransform",
"input": {
"data": {
"data": [
"reader_1.data"
]
},
"model": [
"data_transform_0.model"
]
},
"output": {
"data": [
"data"
],
"model": [
"model"
]
}
},
"intersection_0": {
"module": "Intersection",
"input": {
"data": {
"data": [
"data_transform_0.data"
]
}
},
"output": {
"data": [
"data"
]
}
},
"intersection_1": {
"module": "Intersection",
"input": {
"data": {
"data": [
"data_transform_1.data"
]
}
},
"output": {
"data": [
"data"
]
}
},
"union_0": {
"module": "Union",
"input": {
"data": {
"data": [
"intersection_0.data",
"intersection_1.data"
]
}
},
"output": {
"data": [
"data"
]
}
},
"hetero_lr_0": {
"module": "HeteroLR",
"input": {
"data": {
"train_data": [
"union_0.data"
]
}
},
"output": {
"data": [
"data"
],
"model": [
"model"
]
}
},
"evaluation_0": {
"module": "Evaluation",
"input": {
"data": {
"data": [
"hetero_lr_0.data"
]
}
},
"output": {
"data": [
"data"
]
}
}
}
}
test_union_tag_value_job_conf.json
{
"dsl_version": 2,
"initiator": {
"role": "guest",
"party_id": 9999
},
"role": {
"guest": [
9999
]
},
"component_parameters": {
"common": {
"union_0": {
"need_run": true,
"allow_missing": false,
"keep_duplicate": true
},
"data_transform_0": {
"input_format": "tag",
"delimitor": ",",
"tag_with_value": true,
"with_label": false,
"output_format": "dense"
}
},
"role": {
"guest": {
"0": {
"reader_1": {
"table": {
"name": "tag_value_2",
"namespace": "experiment"
}
},
"reader_2": {
"table": {
"name": "tag_value_3",
"namespace": "experiment"
}
},
"reader_0": {
"table": {
"name": "tag_value_1",
"namespace": "experiment"
}
}
}
}
}
}
}
test_union_data_transform_job_conf.json
{
"dsl_version": 2,
"initiator": {
"role": "guest",
"party_id": 9999
},
"role": {
"guest": [
9999
]
},
"component_parameters": {
"common": {
"union_0": {
"allow_missing": false,
"keep_duplicate": true
},
"data_transform_0": {
"missing_fill": false,
"outlier_replace": false,
"with_label": true,
"label_name": "y",
"output_format": "dense"
}
},
"role": {
"guest": {
"0": {
"reader_0": {
"table": {
"name": "breast_hetero_guest",
"namespace": "experiment"
}
},
"reader_1": {
"table": {
"name": "breast_hetero_guest",
"namespace": "experiment"
}
}
}
}
}
}
}
union_testsuite.json
{
"data": [
{
"file": "examples/data/breast_hetero_guest.csv",
"head": 1,
"partition": 16,
"table_name": "breast_hetero_guest",
"namespace": "experiment",
"role": "guest_0"
},
{
"file": "examples/data/breast_hetero_host.csv",
"head": 1,
"partition": 16,
"table_name": "breast_hetero_host",
"namespace": "experiment",
"role": "host_0"
},
{
"file": "examples/data/tag_value_1000_140.csv",
"head": 0,
"partition": 16,
"table_name": "tag_value_1",
"namespace": "experiment",
"role": "guest_0"
},
{
"file": "examples/data/tag_value_1000_140.csv",
"head": 0,
"partition": 16,
"table_name": "tag_value_2",
"namespace": "experiment",
"role": "guest_0"
},
{
"file": "examples/data/tag_value_1000_140.csv",
"head": 0,
"partition": 16,
"table_name": "tag_value_3",
"namespace": "experiment",
"role": "guest_0"
}
],
"tasks": {
"union": {
"conf": "./test_union_job_conf.json",
"dsl": "./test_union_job_dsl.json"
},
"union-basic": {
"conf": "./test_union_basic_job_conf.json",
"dsl": "./test_union_basic_job_dsl.json"
},
"union-data-transform": {
"conf": "./test_union_data_transform_job_conf.json",
"dsl": "./test_union_data_transform_job_dsl.json"
},
"union-tag": {
"conf": "./test_union_tag_value_job_conf.json",
"dsl": "./test_union_tag_value_job_dsl.json"
}
}
}