Heterogeneous Pearson Correlation Coefficient¶
Introduction¶
Pearson Correlation Coefficient is a measure of the linear correlation between two variables, X and Y, defined as,
Let
then,
Implementation Detail¶
We use an MPC protocol called SPDZ for Heterogeneous Pearson Correlation Coefficient calculation. For more details, one can refer [here]
Param¶
pearson_param
¶
Classes¶
PearsonParam (BaseParam)
¶
param for pearson correlation
Parameters:
Name | Type | Description | Default |
---|---|---|---|
column_names |
list of string |
list of column names |
None |
column_index |
list of int |
list of column index |
required |
cross_parties |
bool, default: True |
if True, calculate correlation of columns from both party |
True |
need_run |
bool |
set False to skip this party |
True |
use_mix_rand |
bool, defalut: False |
mix system random and pseudo random for quicker calculation |
False |
calc_loca_vif |
bool, default True |
calculate VIF for columns in local |
required |
Source code in federatedml/param/pearson_param.py
class PearsonParam(BaseParam):
"""
param for pearson correlation
Parameters
----------
column_names : list of string
list of column names
column_index : list of int
list of column index
cross_parties : bool, default: True
if True, calculate correlation of columns from both party
need_run : bool
set False to skip this party
use_mix_rand : bool, defalut: False
mix system random and pseudo random for quicker calculation
calc_loca_vif : bool, default True
calculate VIF for columns in local
"""
def __init__(
self,
column_names=None,
column_indexes=None,
cross_parties=True,
need_run=True,
use_mix_rand=False,
calc_local_vif=True,
):
super().__init__()
self.column_names = column_names
self.column_indexes = column_indexes
self.cross_parties = cross_parties
self.need_run = need_run
self.use_mix_rand = use_mix_rand
if column_names is None:
self.column_names = []
if column_indexes is None:
self.column_indexes = []
self.calc_local_vif = calc_local_vif
def check(self):
if not isinstance(self.use_mix_rand, bool):
raise ValueError(
f"use_mix_rand accept bool type only, {type(self.use_mix_rand)} got"
)
if self.cross_parties and (not self.need_run):
raise ValueError(
f"need_run should be True(which is default) when cross_parties is True."
)
if not isinstance(self.column_names, list):
raise ValueError(
f"type mismatch, column_names with type {type(self.column_names)}"
)
for name in self.column_names:
if not isinstance(name, str):
raise ValueError(
f"type mismatch, column_names with element {name}(type is {type(name)})"
)
if isinstance(self.column_indexes, list):
for idx in self.column_indexes:
if not isinstance(idx, int):
raise ValueError(
f"type mismatch, column_indexes with element {idx}(type is {type(idx)})"
)
if isinstance(self.column_indexes, int) and self.column_indexes != -1:
raise ValueError(
f"column_indexes with type int and value {self.column_indexes}(only -1 allowed)"
)
if self.need_run:
if isinstance(self.column_indexes, list) and isinstance(
self.column_names, list
):
if len(self.column_indexes) == 0 and len(self.column_names) == 0:
raise ValueError(f"provide at least one column")
__init__(self, column_names=None, column_indexes=None, cross_parties=True, need_run=True, use_mix_rand=False, calc_local_vif=True)
special
¶Source code in federatedml/param/pearson_param.py
def __init__(
self,
column_names=None,
column_indexes=None,
cross_parties=True,
need_run=True,
use_mix_rand=False,
calc_local_vif=True,
):
super().__init__()
self.column_names = column_names
self.column_indexes = column_indexes
self.cross_parties = cross_parties
self.need_run = need_run
self.use_mix_rand = use_mix_rand
if column_names is None:
self.column_names = []
if column_indexes is None:
self.column_indexes = []
self.calc_local_vif = calc_local_vif
check(self)
¶Source code in federatedml/param/pearson_param.py
def check(self):
if not isinstance(self.use_mix_rand, bool):
raise ValueError(
f"use_mix_rand accept bool type only, {type(self.use_mix_rand)} got"
)
if self.cross_parties and (not self.need_run):
raise ValueError(
f"need_run should be True(which is default) when cross_parties is True."
)
if not isinstance(self.column_names, list):
raise ValueError(
f"type mismatch, column_names with type {type(self.column_names)}"
)
for name in self.column_names:
if not isinstance(name, str):
raise ValueError(
f"type mismatch, column_names with element {name}(type is {type(name)})"
)
if isinstance(self.column_indexes, list):
for idx in self.column_indexes:
if not isinstance(idx, int):
raise ValueError(
f"type mismatch, column_indexes with element {idx}(type is {type(idx)})"
)
if isinstance(self.column_indexes, int) and self.column_indexes != -1:
raise ValueError(
f"column_indexes with type int and value {self.column_indexes}(only -1 allowed)"
)
if self.need_run:
if isinstance(self.column_indexes, list) and isinstance(
self.column_names, list
):
if len(self.column_indexes) == 0 and len(self.column_names) == 0:
raise ValueError(f"provide at least one column")
How to Use¶
-
params
-
column_indexes
-1 or list of int. If -1 provided, all columns are used for calculation. If a list of int provided, columns with given indexes are used for calculation. -
column_names
names of columns use for calculation.
Note
if both params are provided, the union of columns indicated are used for calculation.
Examples¶
Example
## Hetero Pearson Pipeline Example Usage Guide.
#### Example Tasks
This section introduces the Pipeline scripts for different types of tasks.
1. Cross parties Task:
script: pipeline_hetero_pearson.py
2. Host only Task:
script: pipeline_hetero_pearson_host_only.py
3. Sole Task:
script: pipeline_hetero_pearson_sole.py
4. Use Mix Rand schema Task:
script: pipeline_hetero_pearson_mix_rand.py
Users can run a pipeline job directly:
python ${pipeline_script}
pipeline_hetero_pearson_mix_rand.py
import os
import sys
additional_path = os.path.realpath(os.path.join(os.path.realpath(__file__), os.path.pardir, os.path.pardir))
if additional_path not in sys.path:
sys.path.append(additional_path)
from hetero_pearson._common_component import run_pearson_pipeline, dataset
def main(config="../../config.yaml", namespace=""):
common_param = dict(column_indexes=-1, use_mix_rand=True)
pipeline = run_pearson_pipeline(config=config, namespace=namespace, data=dataset.breast, common_param=common_param)
print(pipeline.get_component("hetero_pearson_0").get_model_param())
print(pipeline.get_component("hetero_pearson_0").get_summary())
pipeline_hetero_pearson.py
import os
import sys
additional_path = os.path.realpath(os.path.join(os.path.realpath(__file__), os.path.pardir, os.path.pardir))
if additional_path not in sys.path:
sys.path.append(additional_path)
from hetero_pearson._common_component import run_pearson_pipeline, dataset
def main(config="../../config.yaml", namespace=""):
common_param = dict(column_indexes=-1)
pipeline = run_pearson_pipeline(config=config, namespace=namespace, data=dataset.breast,
common_param=common_param)
print(pipeline.get_component("hetero_pearson_0").get_model_param())
print(pipeline.get_component("hetero_pearson_0").get_summary())
init.py
import os
import sys
additional_path = os.path.realpath('../')
if additional_path not in sys.path:
sys.path.append(additional_path)
pipeline_hetero_pearson_host_only.py
import os
import sys
additional_path = os.path.realpath(os.path.join(os.path.realpath(__file__), os.path.pardir, os.path.pardir))
if additional_path not in sys.path:
sys.path.append(additional_path)
from hetero_pearson._common_component import run_pearson_pipeline, dataset
def main(config="../../config.yaml", namespace=""):
common_param = dict(
column_indexes=-1,
cross_parties=False
)
guest_only_param = dict(
need_run=False
)
pipeline = run_pearson_pipeline(config=config, namespace=namespace, data=dataset.breast,
common_param=common_param, guest_only_param=guest_only_param)
hetero_pearson_testsuite.json
{
"data": [
{
"file": "examples/data/breast_hetero_guest.csv",
"head": 1,
"partition": 16,
"table_name": "breast_hetero_guest",
"namespace": "experiment",
"role": "guest_0"
},
{
"file": "examples/data/breast_hetero_host.csv",
"head": 1,
"partition": 16,
"table_name": "breast_hetero_host",
"namespace": "experiment",
"role": "host_0"
}
],
"pipeline_tasks": {
"default": {
"script": "./pipeline_hetero_pearson.py"
},
"host_only": {
"script": "./pipeline_hetero_pearson_host_only.py"
},
"sole": {
"script": "./pipeline_hetero_pearson_sole.py"
},
"mix_rand": {
"script": "./pipeline_hetero_pearson_mix_rand.py"
}
}
}
runner.py
import argparse
import os
import sys
from enum import Enum
additional_path = os.path.realpath('../')
if additional_path not in sys.path:
sys.path.append(additional_path)
class PearsonExample(Enum):
DEFAULT = "default"
HOST_ONLY = "host_only"
SOLE = "sole"
MIX_RAND = "mix_rand"
def __str__(self):
return self.name
@staticmethod
def from_string(s: str):
try:
return PearsonExample[s.upper()]
except KeyError:
raise ValueError()
if __name__ == '__main__':
parser = argparse.ArgumentParser("PIPELINE DEMO")
parser.add_argument("-config", type=str, help="config file")
parser.add_argument("-example", type=PearsonExample.from_string, required=True,
choices=list(PearsonExample.__iter__()),
help="example to run")
args = parser.parse_args()
kwargs = {}
if args.config is not None:
kwargs["config"] = args.config
example: PearsonExample = args.example
if example == PearsonExample.DEFAULT:
from hetero_pearson.pipeline_hetero_pearson import main
main(**kwargs)
elif example == PearsonExample.SOLE:
from hetero_pearson.pipeline_hetero_pearson_sole import main
main(**kwargs)
elif example == PearsonExample.HOST_ONLY:
from hetero_pearson.pipeline_hetero_pearson_host_only import main
main(**kwargs)
elif example == PearsonExample.MIX_RAND:
from hetero_pearson.pipeline_hetero_pearson_mix_rand import main
main(**kwargs)
else:
raise NotImplementedError(example)
_common_component.py
import argparse
from pipeline.backend.pipeline import PipeLine
from pipeline.component import HeteroPearson
from pipeline.component import DataTransform
from pipeline.component import Intersection
from pipeline.component import Reader
from pipeline.interface import Data
from pipeline.utils.tools import load_job_config
class dataset_meta(type):
@property
def breast(cls):
return {
"guest": {"name": "breast_hetero_guest", "namespace": "experiment"},
"host": [{"name": "breast_hetero_host", "namespace": "experiment"}],
}
class dataset(metaclass=dataset_meta):
...
def run_pearson_pipeline(
config,
namespace,
data,
common_param=None,
guest_only_param=None,
host_only_param=None,
):
if isinstance(config, str):
config = load_job_config(config)
guest_data = data["guest"]
host_data = data["host"][0]
guest_data["namespace"] = f"{guest_data['namespace']}{namespace}"
host_data["namespace"] = f"{host_data['namespace']}{namespace}"
pipeline = (
PipeLine()
.set_initiator(role="guest", party_id=config.parties.guest[0])
.set_roles(guest=config.parties.guest[0], host=config.parties.host[0])
)
reader_0 = Reader(name="reader_0")
reader_0.get_party_instance(
role="guest", party_id=config.parties.guest[0]
).component_param(table=guest_data)
reader_0.get_party_instance(
role="host", party_id=config.parties.host[0]
).component_param(table=host_data)
data_transform_0 = DataTransform(name="data_transform_0")
data_transform_0.get_party_instance(
role="guest", party_id=config.parties.guest[0]
).component_param(with_label=True, output_format="dense")
data_transform_0.get_party_instance(
role="host", party_id=config.parties.host[0]
).component_param(with_label=False)
intersect_0 = Intersection(name="intersection_0")
if common_param is None:
common_param = {}
hetero_pearson_component = HeteroPearson(name="hetero_pearson_0", **common_param)
if guest_only_param:
hetero_pearson_component.get_party_instance(
"guest", config.parties.guest[0]
).component_param(**guest_only_param)
if host_only_param:
hetero_pearson_component.get_party_instance(
"host", config.parties.host[0]
).component_param(**host_only_param)
pipeline.add_component(reader_0)
pipeline.add_component(data_transform_0, data=Data(data=reader_0.output.data))
pipeline.add_component(intersect_0, data=Data(data=data_transform_0.output.data))
pipeline.add_component(
hetero_pearson_component, data=Data(train_data=intersect_0.output.data)
)
pipeline.compile()
pipeline.fit()
return pipeline
def runner(main_func):
parser = argparse.ArgumentParser("PIPELINE DEMO")
parser.add_argument("-config", type=str, help="config file")
args = parser.parse_args()
if args.config is not None:
main_func(args.config)
else:
main_func()
pipeline_hetero_pearson_sole.py
import os
import sys
additional_path = os.path.realpath(os.path.join(os.path.realpath(__file__), os.path.pardir, os.path.pardir))
if additional_path not in sys.path:
sys.path.append(additional_path)
from hetero_pearson._common_component import run_pearson_pipeline, dataset
def main(config="../../config.yaml", namespace=""):
common_param = dict(column_indexes=-1, cross_parties=False)
pipeline = run_pearson_pipeline(config=config, namespace=namespace, data=dataset.breast, common_param=common_param)
print(pipeline.get_component("hetero_pearson_0").get_model_param())
print(pipeline.get_component("hetero_pearson_0").get_summary())
## Hetero Pearson Configuration Usage Guide.
This section introduces the dsl and conf for usage of different type of task.
#### Training Task.
1. Base Cross Parties Task:
dsl: test_hetero_pearson_default_dsl.json
runtime_config : test_hetero_pearson_default_conf.json
2. Host Only Task:
dsl: test_hetero_pearson_host_only_dsl.json
runtime_config : test_hetero_pearson_host_only_conf.json
3. Sole Task:
dsl: test_hetero_pearson_sole_dsl.json
runtime_config : test_hetero_pearson_sole_conf.json
4. Use Mix Rand Task:
dsl: test_hetero_pearson_mix_rand_dsl.json
runtime_config : test_hetero_pearson_mix_rand_conf.json
Users can use following commands to run a task.
flow job submit -c ${runtime_config} -d ${dsl}
test_hetero_pearson_sole_dsl.json
{
"components": {
"reader_0": {
"module": "Reader",
"output": {
"data": [
"data"
]
}
},
"data_transform_0": {
"module": "DataTransform",
"input": {
"data": {
"data": [
"reader_0.data"
]
}
},
"output": {
"data": [
"data"
],
"model": [
"model"
]
}
},
"intersection_0": {
"module": "Intersection",
"input": {
"data": {
"data": [
"data_transform_0.data"
]
}
},
"output": {
"data": [
"data"
]
}
},
"hetero_pearson_0": {
"module": "HeteroPearson",
"input": {
"data": {
"train_data": [
"intersection_0.data"
]
}
},
"output": {
"data": [
"data"
],
"model": [
"model"
]
}
}
}
}
test_hetero_pearson_default_dsl.json
{
"components": {
"reader_0": {
"module": "Reader",
"output": {
"data": [
"data"
]
}
},
"data_transform_0": {
"module": "DataTransform",
"input": {
"data": {
"data": [
"reader_0.data"
]
}
},
"output": {
"data": [
"data"
],
"model": [
"model"
]
}
},
"intersection_0": {
"module": "Intersection",
"input": {
"data": {
"data": [
"data_transform_0.data"
]
}
},
"output": {
"data": [
"data"
]
}
},
"hetero_pearson_0": {
"module": "HeteroPearson",
"input": {
"data": {
"train_data": [
"intersection_0.data"
]
}
},
"output": {
"data": [
"data"
],
"model": [
"model"
]
}
}
}
}
test_hetero_pearson_mix_rand_conf.json
{
"dsl_version": 2,
"initiator": {
"role": "guest",
"party_id": 9999
},
"role": {
"host": [
10000
],
"guest": [
9999
]
},
"component_parameters": {
"common": {
"hetero_pearson_0": {
"column_indexes": -1,
"use_mix_rand": true
}
},
"role": {
"host": {
"0": {
"data_transform_0": {
"with_label": false
},
"reader_0": {
"table": {
"name": "breast_hetero_host",
"namespace": "experiment"
}
}
}
},
"guest": {
"0": {
"data_transform_0": {
"with_label": true,
"output_format": "dense"
},
"reader_0": {
"table": {
"name": "breast_hetero_guest",
"namespace": "experiment"
}
}
}
}
}
}
}
test_hetero_pearson_default_conf.json
{
"dsl_version": 2,
"initiator": {
"role": "guest",
"party_id": 9999
},
"role": {
"host": [
10000
],
"guest": [
9999
]
},
"component_parameters": {
"common": {
"hetero_pearson_0": {
"column_indexes": -1
}
},
"role": {
"guest": {
"0": {
"data_transform_0": {
"with_label": true,
"output_format": "dense"
},
"reader_0": {
"table": {
"name": "breast_hetero_guest",
"namespace": "experiment"
}
}
}
},
"host": {
"0": {
"data_transform_0": {
"with_label": false
},
"reader_0": {
"table": {
"name": "breast_hetero_host",
"namespace": "experiment"
}
}
}
}
}
}
}
test_hetero_pearson_sole_conf.json
{
"dsl_version": 2,
"initiator": {
"role": "guest",
"party_id": 9999
},
"role": {
"host": [
10000
],
"guest": [
9999
]
},
"component_parameters": {
"common": {
"hetero_pearson_0": {
"column_indexes": -1,
"cross_parties": false
}
},
"role": {
"host": {
"0": {
"reader_0": {
"table": {
"name": "breast_hetero_host",
"namespace": "experiment"
}
},
"data_transform_0": {
"with_label": false
}
}
},
"guest": {
"0": {
"reader_0": {
"table": {
"name": "breast_hetero_guest",
"namespace": "experiment"
}
},
"data_transform_0": {
"with_label": true,
"output_format": "dense"
}
}
}
}
}
}
test_hetero_pearson_mix_rand_dsl.json
{
"components": {
"reader_0": {
"module": "Reader",
"output": {
"data": [
"data"
]
}
},
"data_transform_0": {
"module": "DataTransform",
"input": {
"data": {
"data": [
"reader_0.data"
]
}
},
"output": {
"data": [
"data"
],
"model": [
"model"
]
}
},
"intersection_0": {
"module": "Intersection",
"input": {
"data": {
"data": [
"data_transform_0.data"
]
}
},
"output": {
"data": [
"data"
]
}
},
"hetero_pearson_0": {
"module": "HeteroPearson",
"input": {
"data": {
"train_data": [
"intersection_0.data"
]
}
},
"output": {
"data": [
"data"
],
"model": [
"model"
]
}
}
}
}
hetero_pearson_testsuite.json
{
"data": [
{
"file": "examples/data/breast_hetero_guest.csv",
"head": 1,
"partition": 16,
"table_name": "breast_hetero_guest",
"namespace": "experiment",
"role": "guest_0"
},
{
"file": "examples/data/breast_hetero_host.csv",
"head": 1,
"partition": 16,
"table_name": "breast_hetero_host",
"namespace": "experiment",
"role": "host_0"
}
],
"tasks": {
"default": {
"conf": "./test_hetero_pearson_default_conf.json",
"dsl": "./test_hetero_pearson_default_dsl.json"
},
"host_only": {
"conf": "./test_hetero_pearson_host_only_conf.json",
"dsl": "./test_hetero_pearson_host_only_dsl.json"
},
"sole": {
"conf": "./test_hetero_pearson_sole_conf.json",
"dsl": "./test_hetero_pearson_sole_dsl.json"
},
"mix_rand": {
"conf": "./test_hetero_pearson_mix_rand_conf.json",
"dsl": "./test_hetero_pearson_mix_rand_dsl.json"
}
}
}
test_hetero_pearson_host_only_conf.json
{
"dsl_version": 2,
"initiator": {
"role": "guest",
"party_id": 9999
},
"role": {
"host": [
10000
],
"guest": [
9999
]
},
"component_parameters": {
"common": {
"hetero_pearson_0": {
"column_indexes": -1,
"cross_parties": false
}
},
"role": {
"host": {
"0": {
"reader_0": {
"table": {
"name": "breast_hetero_host",
"namespace": "experiment"
}
},
"data_transform_0": {
"with_label": false
}
}
},
"guest": {
"0": {
"hetero_pearson_0": {
"need_run": false
},
"reader_0": {
"table": {
"name": "breast_hetero_guest",
"namespace": "experiment"
}
},
"data_transform_0": {
"with_label": true,
"output_format": "dense"
}
}
}
}
}
}
test_hetero_pearson_host_only_dsl.json
{
"components": {
"reader_0": {
"module": "Reader",
"output": {
"data": [
"data"
]
}
},
"data_transform_0": {
"module": "DataTransform",
"input": {
"data": {
"data": [
"reader_0.data"
]
}
},
"output": {
"data": [
"data"
],
"model": [
"model"
]
}
},
"intersection_0": {
"module": "Intersection",
"input": {
"data": {
"data": [
"data_transform_0.data"
]
}
},
"output": {
"data": [
"data"
]
}
},
"hetero_pearson_0": {
"module": "HeteroPearson",
"input": {
"data": {
"train_data": [
"intersection_0.data"
]
}
},
"output": {
"data": [
"data"
],
"model": [
"model"
]
}
}
}
}