Federation API¶
Low level api¶
__all__
special
¶
Classes¶
FederationABC
¶
federation, get or remote objects and tables
Source code in fate_arch/abc/_federation.py
class FederationABC(metaclass=ABCMeta):
"""
federation, get or remote objects and tables
"""
@abc.abstractmethod
def get(self, name: str,
tag: str,
parties: typing.List[Party],
gc: GarbageCollectionABC) -> typing.List:
"""
get objects/tables from ``parties``
Parameters
----------
name: str
name of transfer variable
tag: str
tag to distinguish each transfer
parties: typing.List[Party]
parties to get objects/tables from
gc: GarbageCollectionABC
used to do some clean jobs
Returns
-------
list
a list of object or a list of table get from parties with same order of `parties`
"""
...
@abc.abstractmethod
def remote(self, v,
name: str,
tag: str,
parties: typing.List[Party],
gc: GarbageCollectionABC) -> typing.NoReturn:
"""
remote object/table to ``parties``
Parameters
----------
v: object or table
object/table to remote
name: str
name of transfer variable
tag: str
tag to distinguish each transfer
parties: typing.List[Party]
parties to remote object/table to
gc: GarbageCollectionABC
used to do some clean jobs
Returns
-------
Notes
"""
...
Methods¶
get(self, name, tag, parties, gc)
¶
get objects/tables from parties
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name |
str |
name of transfer variable |
required |
tag |
str |
tag to distinguish each transfer |
required |
parties |
List[fate_arch.common._types.Party] |
parties to get objects/tables from |
required |
gc |
GarbageCollectionABC |
used to do some clean jobs |
required |
Returns:
Type | Description |
---|---|
List |
a list of object or a list of table get from parties with same order of |
Source code in fate_arch/abc/_federation.py
@abc.abstractmethod
def get(self, name: str,
tag: str,
parties: typing.List[Party],
gc: GarbageCollectionABC) -> typing.List:
"""
get objects/tables from ``parties``
Parameters
----------
name: str
name of transfer variable
tag: str
tag to distinguish each transfer
parties: typing.List[Party]
parties to get objects/tables from
gc: GarbageCollectionABC
used to do some clean jobs
Returns
-------
list
a list of object or a list of table get from parties with same order of `parties`
"""
...
remote(self, v, name, tag, parties, gc)
¶
remote object/table to parties
Parameters:
Name | Type | Description | Default |
---|---|---|---|
v |
object or table |
object/table to remote |
required |
name |
str |
name of transfer variable |
required |
tag |
str |
tag to distinguish each transfer |
required |
parties |
List[fate_arch.common._types.Party] |
parties to remote object/table to |
required |
gc |
GarbageCollectionABC |
used to do some clean jobs |
required |
Source code in fate_arch/abc/_federation.py
@abc.abstractmethod
def remote(self, v,
name: str,
tag: str,
parties: typing.List[Party],
gc: GarbageCollectionABC) -> typing.NoReturn:
"""
remote object/table to ``parties``
Parameters
----------
v: object or table
object/table to remote
name: str
name of transfer variable
tag: str
tag to distinguish each transfer
parties: typing.List[Party]
parties to remote object/table to
gc: GarbageCollectionABC
used to do some clean jobs
Returns
-------
Notes
"""
...
user api¶
remoting or getting an object(table) from other parties is quite easy using apis provided in Variable
.
First to create an instance of BaseTransferVariable, which is simply a collection of Variables:
from federatedml.transfer_variable.transfer_class import secure_add_example_transfer_variable
variable = secure_add_example_transfer_variable.SecureAddExampleTransferVariable()
Then remote or get object(table) by variable provided by this instance:
# remote
variable.guest_share.remote("from guest")
# get
variable.guest_share.get()
LOGGER
¶
__all__
special
¶
Classes¶
FederationTagNamespace
¶
Source code in fate_arch/federation/transfer_variable.py
class FederationTagNamespace(object):
__namespace = "default"
@classmethod
def set_namespace(cls, namespace):
cls.__namespace = namespace
@classmethod
def generate_tag(cls, *suffix):
tags = (cls.__namespace, *map(str, suffix))
return ".".join(tags)
set_namespace(namespace)
classmethod
¶
Source code in fate_arch/federation/transfer_variable.py
@classmethod
def set_namespace(cls, namespace):
cls.__namespace = namespace
generate_tag(*suffix)
classmethod
¶
Source code in fate_arch/federation/transfer_variable.py
@classmethod
def generate_tag(cls, *suffix):
tags = (cls.__namespace, *map(str, suffix))
return ".".join(tags)
IterationGC (GarbageCollectionABC)
¶
Source code in fate_arch/federation/transfer_variable.py
class IterationGC(GarbageCollectionABC):
def __init__(self, capacity=2):
self._ashcan: deque[typing.List[typing.Tuple[typing.Any, str, dict]]] = deque()
self._last_tag: typing.Optional[str] = None
self._capacity = capacity
self._enable = True
def add_gc_action(self, tag: str, obj, method, args_dict):
if self._last_tag == tag:
self._ashcan[-1].append((obj, method, args_dict))
else:
self._ashcan.append([(obj, method, args_dict)])
self._last_tag = tag
def disable(self):
self._enable = False
def set_capacity(self, capacity):
self._capacity = capacity
def gc(self):
if not self._enable:
return
if len(self._ashcan) <= self._capacity:
return
self._safe_gc_call(self._ashcan.popleft())
def clean(self):
while self._ashcan:
self._safe_gc_call(self._ashcan.pop())
@staticmethod
def _safe_gc_call(actions: typing.List[typing.Tuple[typing.Any, str, dict]]):
for obj, method, args_dict in actions:
try:
LOGGER.debug(f"[CLEAN]deleting {obj}, {method}, {args_dict}")
getattr(obj, method)(**args_dict)
except Exception as e:
LOGGER.debug(f"[CLEAN]this could be ignore {e}")
__init__(self, capacity=2)
special
¶
Source code in fate_arch/federation/transfer_variable.py
def __init__(self, capacity=2):
self._ashcan: deque[typing.List[typing.Tuple[typing.Any, str, dict]]] = deque()
self._last_tag: typing.Optional[str] = None
self._capacity = capacity
self._enable = True
add_gc_action(self, tag, obj, method, args_dict)
¶
Source code in fate_arch/federation/transfer_variable.py
def add_gc_action(self, tag: str, obj, method, args_dict):
if self._last_tag == tag:
self._ashcan[-1].append((obj, method, args_dict))
else:
self._ashcan.append([(obj, method, args_dict)])
self._last_tag = tag
disable(self)
¶
Source code in fate_arch/federation/transfer_variable.py
def disable(self):
self._enable = False
set_capacity(self, capacity)
¶
Source code in fate_arch/federation/transfer_variable.py
def set_capacity(self, capacity):
self._capacity = capacity
gc(self)
¶
Source code in fate_arch/federation/transfer_variable.py
def gc(self):
if not self._enable:
return
if len(self._ashcan) <= self._capacity:
return
self._safe_gc_call(self._ashcan.popleft())
clean(self)
¶
Source code in fate_arch/federation/transfer_variable.py
def clean(self):
while self._ashcan:
self._safe_gc_call(self._ashcan.pop())
Variable
¶
variable to distinguish federation by name
Source code in fate_arch/federation/transfer_variable.py
class Variable(object):
"""
variable to distinguish federation by name
"""
__instances: typing.MutableMapping[str, "Variable"] = {}
@classmethod
def get_or_create(
cls, name, create_func: typing.Callable[[], "Variable"]
) -> "Variable":
if name not in cls.__instances:
value = create_func()
cls.__instances[name] = value
return cls.__instances[name]
def __init__(
self, name: str, src: typing.Tuple[str, ...], dst: typing.Tuple[str, ...]
):
if name in self.__instances:
raise RuntimeError(
f"{self.__instances[name]} with {name} already initialized, which expected to be an singleton object."
)
assert (
len(name.split(".")) >= 3
), "incorrect name format, should be `module_name.class_name.variable_name`"
self._name = name
self._src = src
self._dst = dst
self._get_gc = IterationGC()
self._remote_gc = IterationGC()
self._use_short_name = True
self._short_name = self._get_short_name(self._name)
@staticmethod
def _get_short_name(name):
fix_sized = hashlib.blake2b(name.encode("utf-8"), digest_size=10).hexdigest()
_, right = name.rsplit(".", 1)
return f"hash.{fix_sized}.{right}"
# copy never create a new instance
def __copy__(self):
return self
# deepcopy never create a new instance
def __deepcopy__(self, memo):
return self
def set_preserve_num(self, n):
self._get_gc.set_capacity(n)
self._remote_gc.set_capacity(n)
return self
def disable_auto_clean(self):
self._get_gc.disable()
self._remote_gc.disable()
return self
def clean(self):
self._get_gc.clean()
self._remote_gc.clean()
def remote_parties(
self,
obj,
parties: Union[typing.List[Party], Party],
suffix: Union[typing.Any, typing.Tuple] = tuple(),
):
"""
remote object to specified parties
Parameters
----------
obj: object or table
object or table to remote
parties: typing.List[Party]
parties to remote object/table to
suffix: str or tuple of str
suffix used to distinguish federation with in variable
Returns
-------
None
"""
session = get_session()
if isinstance(parties, Party):
parties = [parties]
if not isinstance(suffix, tuple):
suffix = (suffix,)
tag = FederationTagNamespace.generate_tag(*suffix)
for party in parties:
if party.role not in self._dst:
raise RuntimeError(
f"not allowed to remote object to {party} using {self._name}"
)
local = session.parties.local_party.role
if local not in self._src:
raise RuntimeError(
f"not allowed to remote object from {local} using {self._name}"
)
name = self._short_name if self._use_short_name else self._name
timer = profile.federation_remote_timer(name, self._name, tag, local, parties)
session.federation.remote(
v=obj, name=name, tag=tag, parties=parties, gc=self._remote_gc
)
timer.done(session.federation)
self._remote_gc.gc()
def get_parties(
self,
parties: Union[typing.List[Party], Party],
suffix: Union[typing.Any, typing.Tuple] = tuple(),
):
"""
get objects/tables from specified parties
Parameters
----------
parties: typing.List[Party]
parties to remote object/table to
suffix: str or tuple of str
suffix used to distinguish federation with in variable
Returns
-------
list
a list of objects/tables get from parties with same order of ``parties``
"""
session = get_session()
if not isinstance(parties, list):
parties = [parties]
if not isinstance(suffix, tuple):
suffix = (suffix,)
tag = FederationTagNamespace.generate_tag(*suffix)
for party in parties:
if party.role not in self._src:
raise RuntimeError(
f"not allowed to get object from {party} using {self._name}"
)
local = session.parties.local_party.role
if local not in self._dst:
raise RuntimeError(
f"not allowed to get object to {local} using {self._name}"
)
name = self._short_name if self._use_short_name else self._name
timer = profile.federation_get_timer(name, self._name, tag, local, parties)
rtn = session.federation.get(
name=name, tag=tag, parties=parties, gc=self._get_gc
)
timer.done(session.federation)
self._get_gc.gc()
return rtn
def remote(self, obj, role=None, idx=-1, suffix=tuple()):
"""
send obj to other parties.
Args:
obj: object to be sent
role: role of parties to sent to, use one of ['Host', 'Guest', 'Arbiter', None].
The default is None, means sent values to parties regardless their party role
idx: id of party to sent to.
The default is -1, which means sent values to parties regardless their party id
suffix: additional tag suffix, the default is tuple()
"""
party_info = get_parties()
if idx >= 0 and role is None:
raise ValueError("role cannot be None if idx specified")
# get subset of dst roles in runtime conf
if role is None:
parties = party_info.roles_to_parties(self._dst, strict=False)
else:
if isinstance(role, str):
role = [role]
parties = party_info.roles_to_parties(role)
if idx >= 0:
if idx >= len(parties):
raise RuntimeError(
f"try to remote to {idx}th party while only {len(parties)} configurated: {parties}, check {self._name}"
)
parties = parties[idx]
return self.remote_parties(obj=obj, parties=parties, suffix=suffix)
def get(self, idx=-1, role=None, suffix=tuple()):
"""
get obj from other parties.
Args:
idx: id of party to get from.
The default is -1, which means get values from parties regardless their party id
suffix: additional tag suffix, the default is tuple()
Returns:
object or list of object
"""
if role is None:
src_parties = get_parties().roles_to_parties(
roles=self._src, strict=False
)
else:
if isinstance(role, str):
role = [role]
src_parties = get_parties().roles_to_parties(
roles=role, strict=False
)
if isinstance(idx, list):
rtn = self.get_parties(parties=[src_parties[i] for i in idx], suffix=suffix)
elif isinstance(idx, int):
if idx < 0:
rtn = self.get_parties(parties=src_parties, suffix=suffix)
else:
if idx >= len(src_parties):
raise RuntimeError(
f"try to get from {idx}th party while only {len(src_parties)} configurated: {src_parties}, check {self._name}"
)
rtn = self.get_parties(parties=src_parties[idx], suffix=suffix)[0]
else:
raise ValueError(
f"illegal idx type: {type(idx)}, supported types: int or list of int"
)
return rtn
Methods¶
get_or_create(name, create_func)
classmethod
¶
Source code in fate_arch/federation/transfer_variable.py
@classmethod
def get_or_create(
cls, name, create_func: typing.Callable[[], "Variable"]
) -> "Variable":
if name not in cls.__instances:
value = create_func()
cls.__instances[name] = value
return cls.__instances[name]
__init__(self, name, src, dst)
special
¶
Source code in fate_arch/federation/transfer_variable.py
def __init__(
self, name: str, src: typing.Tuple[str, ...], dst: typing.Tuple[str, ...]
):
if name in self.__instances:
raise RuntimeError(
f"{self.__instances[name]} with {name} already initialized, which expected to be an singleton object."
)
assert (
len(name.split(".")) >= 3
), "incorrect name format, should be `module_name.class_name.variable_name`"
self._name = name
self._src = src
self._dst = dst
self._get_gc = IterationGC()
self._remote_gc = IterationGC()
self._use_short_name = True
self._short_name = self._get_short_name(self._name)
__copy__(self)
special
¶
Source code in fate_arch/federation/transfer_variable.py
def __copy__(self):
return self
__deepcopy__(self, memo)
special
¶
Source code in fate_arch/federation/transfer_variable.py
def __deepcopy__(self, memo):
return self
set_preserve_num(self, n)
¶
Source code in fate_arch/federation/transfer_variable.py
def set_preserve_num(self, n):
self._get_gc.set_capacity(n)
self._remote_gc.set_capacity(n)
return self
disable_auto_clean(self)
¶
Source code in fate_arch/federation/transfer_variable.py
def disable_auto_clean(self):
self._get_gc.disable()
self._remote_gc.disable()
return self
clean(self)
¶
Source code in fate_arch/federation/transfer_variable.py
def clean(self):
self._get_gc.clean()
self._remote_gc.clean()
remote_parties(self, obj, parties, suffix=())
¶
remote object to specified parties
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj |
object or table |
object or table to remote |
required |
parties |
Union[List[fate_arch.common._types.Party], fate_arch.common._types.Party] |
parties to remote object/table to |
required |
suffix |
Union[Any, Tuple] |
suffix used to distinguish federation with in variable |
() |
Source code in fate_arch/federation/transfer_variable.py
def remote_parties(
self,
obj,
parties: Union[typing.List[Party], Party],
suffix: Union[typing.Any, typing.Tuple] = tuple(),
):
"""
remote object to specified parties
Parameters
----------
obj: object or table
object or table to remote
parties: typing.List[Party]
parties to remote object/table to
suffix: str or tuple of str
suffix used to distinguish federation with in variable
Returns
-------
None
"""
session = get_session()
if isinstance(parties, Party):
parties = [parties]
if not isinstance(suffix, tuple):
suffix = (suffix,)
tag = FederationTagNamespace.generate_tag(*suffix)
for party in parties:
if party.role not in self._dst:
raise RuntimeError(
f"not allowed to remote object to {party} using {self._name}"
)
local = session.parties.local_party.role
if local not in self._src:
raise RuntimeError(
f"not allowed to remote object from {local} using {self._name}"
)
name = self._short_name if self._use_short_name else self._name
timer = profile.federation_remote_timer(name, self._name, tag, local, parties)
session.federation.remote(
v=obj, name=name, tag=tag, parties=parties, gc=self._remote_gc
)
timer.done(session.federation)
self._remote_gc.gc()
get_parties(self, parties, suffix=())
¶
get objects/tables from specified parties
Parameters:
Name | Type | Description | Default |
---|---|---|---|
parties |
Union[List[fate_arch.common._types.Party], fate_arch.common._types.Party] |
parties to remote object/table to |
required |
suffix |
Union[Any, Tuple] |
suffix used to distinguish federation with in variable |
() |
Returns:
Type | Description |
---|---|
list |
a list of objects/tables get from parties with same order of |
Source code in fate_arch/federation/transfer_variable.py
def get_parties(
self,
parties: Union[typing.List[Party], Party],
suffix: Union[typing.Any, typing.Tuple] = tuple(),
):
"""
get objects/tables from specified parties
Parameters
----------
parties: typing.List[Party]
parties to remote object/table to
suffix: str or tuple of str
suffix used to distinguish federation with in variable
Returns
-------
list
a list of objects/tables get from parties with same order of ``parties``
"""
session = get_session()
if not isinstance(parties, list):
parties = [parties]
if not isinstance(suffix, tuple):
suffix = (suffix,)
tag = FederationTagNamespace.generate_tag(*suffix)
for party in parties:
if party.role not in self._src:
raise RuntimeError(
f"not allowed to get object from {party} using {self._name}"
)
local = session.parties.local_party.role
if local not in self._dst:
raise RuntimeError(
f"not allowed to get object to {local} using {self._name}"
)
name = self._short_name if self._use_short_name else self._name
timer = profile.federation_get_timer(name, self._name, tag, local, parties)
rtn = session.federation.get(
name=name, tag=tag, parties=parties, gc=self._get_gc
)
timer.done(session.federation)
self._get_gc.gc()
return rtn
remote(self, obj, role=None, idx=-1, suffix=())
¶
send obj to other parties.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
obj |
None |
object to be sent |
required |
role |
None |
role of parties to sent to, use one of ['Host', 'Guest', 'Arbiter', None]. The default is None, means sent values to parties regardless their party role |
None |
idx |
None |
id of party to sent to. The default is -1, which means sent values to parties regardless their party id |
-1 |
suffix |
None |
additional tag suffix, the default is tuple() |
() |
Source code in fate_arch/federation/transfer_variable.py
def remote(self, obj, role=None, idx=-1, suffix=tuple()):
"""
send obj to other parties.
Args:
obj: object to be sent
role: role of parties to sent to, use one of ['Host', 'Guest', 'Arbiter', None].
The default is None, means sent values to parties regardless their party role
idx: id of party to sent to.
The default is -1, which means sent values to parties regardless their party id
suffix: additional tag suffix, the default is tuple()
"""
party_info = get_parties()
if idx >= 0 and role is None:
raise ValueError("role cannot be None if idx specified")
# get subset of dst roles in runtime conf
if role is None:
parties = party_info.roles_to_parties(self._dst, strict=False)
else:
if isinstance(role, str):
role = [role]
parties = party_info.roles_to_parties(role)
if idx >= 0:
if idx >= len(parties):
raise RuntimeError(
f"try to remote to {idx}th party while only {len(parties)} configurated: {parties}, check {self._name}"
)
parties = parties[idx]
return self.remote_parties(obj=obj, parties=parties, suffix=suffix)
get(self, idx=-1, role=None, suffix=())
¶
get obj from other parties.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
idx |
None |
id of party to get from. The default is -1, which means get values from parties regardless their party id |
-1 |
suffix |
None |
additional tag suffix, the default is tuple() |
() |
Source code in fate_arch/federation/transfer_variable.py
def get(self, idx=-1, role=None, suffix=tuple()):
"""
get obj from other parties.
Args:
idx: id of party to get from.
The default is -1, which means get values from parties regardless their party id
suffix: additional tag suffix, the default is tuple()
Returns:
object or list of object
"""
if role is None:
src_parties = get_parties().roles_to_parties(
roles=self._src, strict=False
)
else:
if isinstance(role, str):
role = [role]
src_parties = get_parties().roles_to_parties(
roles=role, strict=False
)
if isinstance(idx, list):
rtn = self.get_parties(parties=[src_parties[i] for i in idx], suffix=suffix)
elif isinstance(idx, int):
if idx < 0:
rtn = self.get_parties(parties=src_parties, suffix=suffix)
else:
if idx >= len(src_parties):
raise RuntimeError(
f"try to get from {idx}th party while only {len(src_parties)} configurated: {src_parties}, check {self._name}"
)
rtn = self.get_parties(parties=src_parties[idx], suffix=suffix)[0]
else:
raise ValueError(
f"illegal idx type: {type(idx)}, supported types: int or list of int"
)
return rtn
BaseTransferVariables
¶
Source code in fate_arch/federation/transfer_variable.py
class BaseTransferVariables(object):
def __init__(self, *args):
pass
def __copy__(self):
return self
def __deepcopy__(self, memo):
return self
@staticmethod
def set_flowid(flowid):
"""
set global namespace for federations.
Parameters
----------
flowid: str
namespace
Returns
-------
None
"""
FederationTagNamespace.set_namespace(str(flowid))
def _create_variable(
self, name: str, src: typing.Iterable[str], dst: typing.Iterable[str]
) -> Variable:
full_name = f"{self.__module__}.{self.__class__.__name__}.{name}"
return Variable.get_or_create(
full_name, lambda: Variable(name=full_name, src=tuple(src), dst=tuple(dst))
)
@staticmethod
def all_parties():
"""
get all parties
Returns
-------
list
list of parties
"""
return get_parties().all_parties
@staticmethod
def local_party():
"""
indicate local party
Returns
-------
Party
party this program running on
"""
return get_parties().local_party
Methods¶
__init__(self, *args)
special
¶
Source code in fate_arch/federation/transfer_variable.py
def __init__(self, *args):
pass
__copy__(self)
special
¶
Source code in fate_arch/federation/transfer_variable.py
def __copy__(self):
return self
__deepcopy__(self, memo)
special
¶
Source code in fate_arch/federation/transfer_variable.py
def __deepcopy__(self, memo):
return self
set_flowid(flowid)
staticmethod
¶
set global namespace for federations.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
flowid |
str |
namespace |
required |
Source code in fate_arch/federation/transfer_variable.py
@staticmethod
def set_flowid(flowid):
"""
set global namespace for federations.
Parameters
----------
flowid: str
namespace
Returns
-------
None
"""
FederationTagNamespace.set_namespace(str(flowid))
all_parties()
staticmethod
¶
get all parties
Returns:
Type | Description |
---|---|
list |
list of parties |
Source code in fate_arch/federation/transfer_variable.py
@staticmethod
def all_parties():
"""
get all parties
Returns
-------
list
list of parties
"""
return get_parties().all_parties
local_party()
staticmethod
¶
indicate local party
Returns:
Type | Description |
---|---|
Party |
party this program running on |
Source code in fate_arch/federation/transfer_variable.py
@staticmethod
def local_party():
"""
indicate local party
Returns
-------
Party
party this program running on
"""
return get_parties().local_party