Skip to content

Federation API

Low level api

__all__ special

Classes

FederationABC

federation, get or remote objects and tables

Source code in fate_arch/abc/_federation.py
class FederationABC(metaclass=ABCMeta):
    """
    federation, get or remote objects and tables
    """

    @abc.abstractmethod
    def get(self, name: str,
            tag: str,
            parties: typing.List[Party],
            gc: GarbageCollectionABC) -> typing.List:
        """
        get objects/tables from ``parties``

        Parameters
        ----------
        name: str
           name of transfer variable
        tag: str
           tag to distinguish each transfer
        parties: typing.List[Party]
           parties to get objects/tables from
        gc: GarbageCollectionABC
           used to do some clean jobs

        Returns
        -------
        list
           a list of object or a list of table get from parties with same order of `parties`

        """
        ...

    @abc.abstractmethod
    def remote(self, v,
               name: str,
               tag: str,
               parties: typing.List[Party],
               gc: GarbageCollectionABC) -> typing.NoReturn:
        """
        remote object/table to ``parties``

        Parameters
        ----------
        v: object or table
           object/table to remote
        name: str
           name of transfer variable
        tag: str
           tag to distinguish each transfer
        parties: typing.List[Party]
           parties to remote object/table to
        gc: GarbageCollectionABC
           used to do some clean jobs

        Returns
        -------
        Notes
        """
        ...

Methods

get(self, name, tag, parties, gc)

get objects/tables from parties

Parameters:

Name Type Description Default
name str

name of transfer variable

required
tag str

tag to distinguish each transfer

required
parties List[fate_arch.common._types.Party]

parties to get objects/tables from

required
gc GarbageCollectionABC

used to do some clean jobs

required

Returns:

Type Description
List

a list of object or a list of table get from parties with same order of parties

Source code in fate_arch/abc/_federation.py
@abc.abstractmethod
def get(self, name: str,
        tag: str,
        parties: typing.List[Party],
        gc: GarbageCollectionABC) -> typing.List:
    """
    get objects/tables from ``parties``

    Parameters
    ----------
    name: str
       name of transfer variable
    tag: str
       tag to distinguish each transfer
    parties: typing.List[Party]
       parties to get objects/tables from
    gc: GarbageCollectionABC
       used to do some clean jobs

    Returns
    -------
    list
       a list of object or a list of table get from parties with same order of `parties`

    """
    ...
remote(self, v, name, tag, parties, gc)

remote object/table to parties

Parameters:

Name Type Description Default
v object or table

object/table to remote

required
name str

name of transfer variable

required
tag str

tag to distinguish each transfer

required
parties List[fate_arch.common._types.Party]

parties to remote object/table to

required
gc GarbageCollectionABC

used to do some clean jobs

required
Source code in fate_arch/abc/_federation.py
@abc.abstractmethod
def remote(self, v,
           name: str,
           tag: str,
           parties: typing.List[Party],
           gc: GarbageCollectionABC) -> typing.NoReturn:
    """
    remote object/table to ``parties``

    Parameters
    ----------
    v: object or table
       object/table to remote
    name: str
       name of transfer variable
    tag: str
       tag to distinguish each transfer
    parties: typing.List[Party]
       parties to remote object/table to
    gc: GarbageCollectionABC
       used to do some clean jobs

    Returns
    -------
    Notes
    """
    ...

user api

remoting or getting an object(table) from other parties is quite easy using apis provided in Variable. First to create an instance of BaseTransferVariable, which is simply a collection of Variables:

from federatedml.transfer_variable.transfer_class import secure_add_example_transfer_variable
variable = secure_add_example_transfer_variable.SecureAddExampleTransferVariable()

Then remote or get object(table) by variable provided by this instance:

# remote
variable.guest_share.remote("from guest")

# get
variable.guest_share.get()

LOGGER

__all__ special

Classes

FederationTagNamespace

Source code in fate_arch/federation/transfer_variable.py
class FederationTagNamespace(object):
    __namespace = "default"

    @classmethod
    def set_namespace(cls, namespace):
        cls.__namespace = namespace

    @classmethod
    def generate_tag(cls, *suffix):
        tags = (cls.__namespace, *map(str, suffix))
        return ".".join(tags)
set_namespace(namespace) classmethod
Source code in fate_arch/federation/transfer_variable.py
@classmethod
def set_namespace(cls, namespace):
    cls.__namespace = namespace
generate_tag(*suffix) classmethod
Source code in fate_arch/federation/transfer_variable.py
@classmethod
def generate_tag(cls, *suffix):
    tags = (cls.__namespace, *map(str, suffix))
    return ".".join(tags)

IterationGC (GarbageCollectionABC)

Source code in fate_arch/federation/transfer_variable.py
class IterationGC(GarbageCollectionABC):
    def __init__(self, capacity=2):
        self._ashcan: deque[typing.List[typing.Tuple[typing.Any, str, dict]]] = deque()
        self._last_tag: typing.Optional[str] = None
        self._capacity = capacity
        self._enable = True

    def add_gc_action(self, tag: str, obj, method, args_dict):
        if self._last_tag == tag:
            self._ashcan[-1].append((obj, method, args_dict))
        else:
            self._ashcan.append([(obj, method, args_dict)])
            self._last_tag = tag

    def disable(self):
        self._enable = False

    def set_capacity(self, capacity):
        self._capacity = capacity

    def gc(self):
        if not self._enable:
            return
        if len(self._ashcan) <= self._capacity:
            return
        self._safe_gc_call(self._ashcan.popleft())

    def clean(self):
        while self._ashcan:
            self._safe_gc_call(self._ashcan.pop())

    @staticmethod
    def _safe_gc_call(actions: typing.List[typing.Tuple[typing.Any, str, dict]]):
        for obj, method, args_dict in actions:
            try:
                LOGGER.debug(f"[CLEAN]deleting {obj}, {method}, {args_dict}")
                getattr(obj, method)(**args_dict)
            except Exception as e:
                LOGGER.debug(f"[CLEAN]this could be ignore {e}")
__init__(self, capacity=2) special
Source code in fate_arch/federation/transfer_variable.py
def __init__(self, capacity=2):
    self._ashcan: deque[typing.List[typing.Tuple[typing.Any, str, dict]]] = deque()
    self._last_tag: typing.Optional[str] = None
    self._capacity = capacity
    self._enable = True
add_gc_action(self, tag, obj, method, args_dict)
Source code in fate_arch/federation/transfer_variable.py
def add_gc_action(self, tag: str, obj, method, args_dict):
    if self._last_tag == tag:
        self._ashcan[-1].append((obj, method, args_dict))
    else:
        self._ashcan.append([(obj, method, args_dict)])
        self._last_tag = tag
disable(self)
Source code in fate_arch/federation/transfer_variable.py
def disable(self):
    self._enable = False
set_capacity(self, capacity)
Source code in fate_arch/federation/transfer_variable.py
def set_capacity(self, capacity):
    self._capacity = capacity
gc(self)
Source code in fate_arch/federation/transfer_variable.py
def gc(self):
    if not self._enable:
        return
    if len(self._ashcan) <= self._capacity:
        return
    self._safe_gc_call(self._ashcan.popleft())
clean(self)
Source code in fate_arch/federation/transfer_variable.py
def clean(self):
    while self._ashcan:
        self._safe_gc_call(self._ashcan.pop())

Variable

variable to distinguish federation by name

Source code in fate_arch/federation/transfer_variable.py
class Variable(object):
    """
    variable to distinguish federation by name
    """

    __instances: typing.MutableMapping[str, "Variable"] = {}

    @classmethod
    def get_or_create(
        cls, name, create_func: typing.Callable[[], "Variable"]
    ) -> "Variable":
        if name not in cls.__instances:
            value = create_func()
            cls.__instances[name] = value
        return cls.__instances[name]

    def __init__(
        self, name: str, src: typing.Tuple[str, ...], dst: typing.Tuple[str, ...]
    ):

        if name in self.__instances:
            raise RuntimeError(
                f"{self.__instances[name]} with {name} already initialized, which expected to be an singleton object."
            )

        assert (
            len(name.split(".")) >= 3
        ), "incorrect name format, should be `module_name.class_name.variable_name`"
        self._name = name
        self._src = src
        self._dst = dst
        self._get_gc = IterationGC()
        self._remote_gc = IterationGC()
        self._use_short_name = True
        self._short_name = self._get_short_name(self._name)

    @staticmethod
    def _get_short_name(name):
        fix_sized = hashlib.blake2b(name.encode("utf-8"), digest_size=10).hexdigest()
        _, right = name.rsplit(".", 1)
        return f"hash.{fix_sized}.{right}"

    # copy never create a new instance
    def __copy__(self):
        return self

    # deepcopy never create a new instance
    def __deepcopy__(self, memo):
        return self

    def set_preserve_num(self, n):
        self._get_gc.set_capacity(n)
        self._remote_gc.set_capacity(n)
        return self

    def disable_auto_clean(self):
        self._get_gc.disable()
        self._remote_gc.disable()
        return self

    def clean(self):
        self._get_gc.clean()
        self._remote_gc.clean()

    def remote_parties(
        self,
        obj,
        parties: Union[typing.List[Party], Party],
        suffix: Union[typing.Any, typing.Tuple] = tuple(),
    ):
        """
        remote object to specified parties

        Parameters
        ----------
        obj: object or table
           object or table to remote
        parties: typing.List[Party]
           parties to remote object/table to
        suffix: str or tuple of str
           suffix used to distinguish federation with in variable

        Returns
        -------
        None
        """
        session = get_session()
        if isinstance(parties, Party):
            parties = [parties]
        if not isinstance(suffix, tuple):
            suffix = (suffix,)
        tag = FederationTagNamespace.generate_tag(*suffix)

        for party in parties:
            if party.role not in self._dst:
                raise RuntimeError(
                    f"not allowed to remote object to {party} using {self._name}"
                )
        local = session.parties.local_party.role
        if local not in self._src:
            raise RuntimeError(
                f"not allowed to remote object from {local} using {self._name}"
            )

        name = self._short_name if self._use_short_name else self._name

        timer = profile.federation_remote_timer(name, self._name, tag, local, parties)
        session.federation.remote(
            v=obj, name=name, tag=tag, parties=parties, gc=self._remote_gc
        )
        timer.done(session.federation)

        self._remote_gc.gc()

    def get_parties(
        self,
        parties: Union[typing.List[Party], Party],
        suffix: Union[typing.Any, typing.Tuple] = tuple(),
    ):
        """
        get objects/tables from specified parties

        Parameters
        ----------
        parties: typing.List[Party]
           parties to remote object/table to
        suffix: str or tuple of str
           suffix used to distinguish federation with in variable

        Returns
        -------
        list
           a list of objects/tables get from parties with same order of ``parties``

        """
        session = get_session()
        if not isinstance(parties, list):
            parties = [parties]
        if not isinstance(suffix, tuple):
            suffix = (suffix,)
        tag = FederationTagNamespace.generate_tag(*suffix)

        for party in parties:
            if party.role not in self._src:
                raise RuntimeError(
                    f"not allowed to get object from {party} using {self._name}"
                )
        local = session.parties.local_party.role
        if local not in self._dst:
            raise RuntimeError(
                f"not allowed to get object to {local} using {self._name}"
            )

        name = self._short_name if self._use_short_name else self._name
        timer = profile.federation_get_timer(name, self._name, tag, local, parties)
        rtn = session.federation.get(
            name=name, tag=tag, parties=parties, gc=self._get_gc
        )
        timer.done(session.federation)

        self._get_gc.gc()

        return rtn

    def remote(self, obj, role=None, idx=-1, suffix=tuple()):
        """
        send obj to other parties.

        Args:
            obj: object to be sent
            role: role of parties to sent to, use one of ['Host', 'Guest', 'Arbiter', None].
                The default is None, means sent values to parties regardless their party role
            idx: id of party to sent to.
                The default is -1, which means sent values to parties regardless their party id
            suffix: additional tag suffix, the default is tuple()
        """
        party_info = get_parties()
        if idx >= 0 and role is None:
            raise ValueError("role cannot be None if idx specified")

        # get subset of dst roles in runtime conf
        if role is None:
            parties = party_info.roles_to_parties(self._dst, strict=False)
        else:
            if isinstance(role, str):
                role = [role]
            parties = party_info.roles_to_parties(role)

        if idx >= 0:
            if idx >= len(parties):
                raise RuntimeError(
                    f"try to remote to {idx}th party while only {len(parties)} configurated: {parties}, check {self._name}"
                )
            parties = parties[idx]
        return self.remote_parties(obj=obj, parties=parties, suffix=suffix)

    def get(self, idx=-1, role=None, suffix=tuple()):
        """
        get obj from other parties.

        Args:
            idx: id of party to get from.
                The default is -1, which means get values from parties regardless their party id
            suffix: additional tag suffix, the default is tuple()

        Returns:
            object or list of object
        """
        if role is None:
            src_parties = get_parties().roles_to_parties(
                roles=self._src, strict=False
            )
        else:
            if isinstance(role, str):
                role = [role]
            src_parties = get_parties().roles_to_parties(
                roles=role, strict=False
            )
        if isinstance(idx, list):
            rtn = self.get_parties(parties=[src_parties[i] for i in idx], suffix=suffix)
        elif isinstance(idx, int):
            if idx < 0:
                rtn = self.get_parties(parties=src_parties, suffix=suffix)
            else:
                if idx >= len(src_parties):
                    raise RuntimeError(
                        f"try to get from {idx}th party while only {len(src_parties)} configurated: {src_parties}, check {self._name}"
                    )
                rtn = self.get_parties(parties=src_parties[idx], suffix=suffix)[0]
        else:
            raise ValueError(
                f"illegal idx type: {type(idx)}, supported types: int or list of int"
            )
        return rtn

Methods

get_or_create(name, create_func) classmethod
Source code in fate_arch/federation/transfer_variable.py
@classmethod
def get_or_create(
    cls, name, create_func: typing.Callable[[], "Variable"]
) -> "Variable":
    if name not in cls.__instances:
        value = create_func()
        cls.__instances[name] = value
    return cls.__instances[name]
__init__(self, name, src, dst) special
Source code in fate_arch/federation/transfer_variable.py
def __init__(
    self, name: str, src: typing.Tuple[str, ...], dst: typing.Tuple[str, ...]
):

    if name in self.__instances:
        raise RuntimeError(
            f"{self.__instances[name]} with {name} already initialized, which expected to be an singleton object."
        )

    assert (
        len(name.split(".")) >= 3
    ), "incorrect name format, should be `module_name.class_name.variable_name`"
    self._name = name
    self._src = src
    self._dst = dst
    self._get_gc = IterationGC()
    self._remote_gc = IterationGC()
    self._use_short_name = True
    self._short_name = self._get_short_name(self._name)
__copy__(self) special
Source code in fate_arch/federation/transfer_variable.py
def __copy__(self):
    return self
__deepcopy__(self, memo) special
Source code in fate_arch/federation/transfer_variable.py
def __deepcopy__(self, memo):
    return self
set_preserve_num(self, n)
Source code in fate_arch/federation/transfer_variable.py
def set_preserve_num(self, n):
    self._get_gc.set_capacity(n)
    self._remote_gc.set_capacity(n)
    return self
disable_auto_clean(self)
Source code in fate_arch/federation/transfer_variable.py
def disable_auto_clean(self):
    self._get_gc.disable()
    self._remote_gc.disable()
    return self
clean(self)
Source code in fate_arch/federation/transfer_variable.py
def clean(self):
    self._get_gc.clean()
    self._remote_gc.clean()
remote_parties(self, obj, parties, suffix=())

remote object to specified parties

Parameters:

Name Type Description Default
obj object or table

object or table to remote

required
parties Union[List[fate_arch.common._types.Party], fate_arch.common._types.Party]

parties to remote object/table to

required
suffix Union[Any, Tuple]

suffix used to distinguish federation with in variable

()
Source code in fate_arch/federation/transfer_variable.py
def remote_parties(
    self,
    obj,
    parties: Union[typing.List[Party], Party],
    suffix: Union[typing.Any, typing.Tuple] = tuple(),
):
    """
    remote object to specified parties

    Parameters
    ----------
    obj: object or table
       object or table to remote
    parties: typing.List[Party]
       parties to remote object/table to
    suffix: str or tuple of str
       suffix used to distinguish federation with in variable

    Returns
    -------
    None
    """
    session = get_session()
    if isinstance(parties, Party):
        parties = [parties]
    if not isinstance(suffix, tuple):
        suffix = (suffix,)
    tag = FederationTagNamespace.generate_tag(*suffix)

    for party in parties:
        if party.role not in self._dst:
            raise RuntimeError(
                f"not allowed to remote object to {party} using {self._name}"
            )
    local = session.parties.local_party.role
    if local not in self._src:
        raise RuntimeError(
            f"not allowed to remote object from {local} using {self._name}"
        )

    name = self._short_name if self._use_short_name else self._name

    timer = profile.federation_remote_timer(name, self._name, tag, local, parties)
    session.federation.remote(
        v=obj, name=name, tag=tag, parties=parties, gc=self._remote_gc
    )
    timer.done(session.federation)

    self._remote_gc.gc()
get_parties(self, parties, suffix=())

get objects/tables from specified parties

Parameters:

Name Type Description Default
parties Union[List[fate_arch.common._types.Party], fate_arch.common._types.Party]

parties to remote object/table to

required
suffix Union[Any, Tuple]

suffix used to distinguish federation with in variable

()

Returns:

Type Description
list

a list of objects/tables get from parties with same order of parties

Source code in fate_arch/federation/transfer_variable.py
def get_parties(
    self,
    parties: Union[typing.List[Party], Party],
    suffix: Union[typing.Any, typing.Tuple] = tuple(),
):
    """
    get objects/tables from specified parties

    Parameters
    ----------
    parties: typing.List[Party]
       parties to remote object/table to
    suffix: str or tuple of str
       suffix used to distinguish federation with in variable

    Returns
    -------
    list
       a list of objects/tables get from parties with same order of ``parties``

    """
    session = get_session()
    if not isinstance(parties, list):
        parties = [parties]
    if not isinstance(suffix, tuple):
        suffix = (suffix,)
    tag = FederationTagNamespace.generate_tag(*suffix)

    for party in parties:
        if party.role not in self._src:
            raise RuntimeError(
                f"not allowed to get object from {party} using {self._name}"
            )
    local = session.parties.local_party.role
    if local not in self._dst:
        raise RuntimeError(
            f"not allowed to get object to {local} using {self._name}"
        )

    name = self._short_name if self._use_short_name else self._name
    timer = profile.federation_get_timer(name, self._name, tag, local, parties)
    rtn = session.federation.get(
        name=name, tag=tag, parties=parties, gc=self._get_gc
    )
    timer.done(session.federation)

    self._get_gc.gc()

    return rtn
remote(self, obj, role=None, idx=-1, suffix=())

send obj to other parties.

Parameters:

Name Type Description Default
obj None

object to be sent

required
role None

role of parties to sent to, use one of ['Host', 'Guest', 'Arbiter', None]. The default is None, means sent values to parties regardless their party role

None
idx None

id of party to sent to. The default is -1, which means sent values to parties regardless their party id

-1
suffix None

additional tag suffix, the default is tuple()

()
Source code in fate_arch/federation/transfer_variable.py
def remote(self, obj, role=None, idx=-1, suffix=tuple()):
    """
    send obj to other parties.

    Args:
        obj: object to be sent
        role: role of parties to sent to, use one of ['Host', 'Guest', 'Arbiter', None].
            The default is None, means sent values to parties regardless their party role
        idx: id of party to sent to.
            The default is -1, which means sent values to parties regardless their party id
        suffix: additional tag suffix, the default is tuple()
    """
    party_info = get_parties()
    if idx >= 0 and role is None:
        raise ValueError("role cannot be None if idx specified")

    # get subset of dst roles in runtime conf
    if role is None:
        parties = party_info.roles_to_parties(self._dst, strict=False)
    else:
        if isinstance(role, str):
            role = [role]
        parties = party_info.roles_to_parties(role)

    if idx >= 0:
        if idx >= len(parties):
            raise RuntimeError(
                f"try to remote to {idx}th party while only {len(parties)} configurated: {parties}, check {self._name}"
            )
        parties = parties[idx]
    return self.remote_parties(obj=obj, parties=parties, suffix=suffix)
get(self, idx=-1, role=None, suffix=())

get obj from other parties.

Parameters:

Name Type Description Default
idx None

id of party to get from. The default is -1, which means get values from parties regardless their party id

-1
suffix None

additional tag suffix, the default is tuple()

()
Source code in fate_arch/federation/transfer_variable.py
def get(self, idx=-1, role=None, suffix=tuple()):
    """
    get obj from other parties.

    Args:
        idx: id of party to get from.
            The default is -1, which means get values from parties regardless their party id
        suffix: additional tag suffix, the default is tuple()

    Returns:
        object or list of object
    """
    if role is None:
        src_parties = get_parties().roles_to_parties(
            roles=self._src, strict=False
        )
    else:
        if isinstance(role, str):
            role = [role]
        src_parties = get_parties().roles_to_parties(
            roles=role, strict=False
        )
    if isinstance(idx, list):
        rtn = self.get_parties(parties=[src_parties[i] for i in idx], suffix=suffix)
    elif isinstance(idx, int):
        if idx < 0:
            rtn = self.get_parties(parties=src_parties, suffix=suffix)
        else:
            if idx >= len(src_parties):
                raise RuntimeError(
                    f"try to get from {idx}th party while only {len(src_parties)} configurated: {src_parties}, check {self._name}"
                )
            rtn = self.get_parties(parties=src_parties[idx], suffix=suffix)[0]
    else:
        raise ValueError(
            f"illegal idx type: {type(idx)}, supported types: int or list of int"
        )
    return rtn

BaseTransferVariables

Source code in fate_arch/federation/transfer_variable.py
class BaseTransferVariables(object):
    def __init__(self, *args):
        pass

    def __copy__(self):
        return self

    def __deepcopy__(self, memo):
        return self

    @staticmethod
    def set_flowid(flowid):
        """
        set global namespace for federations.

        Parameters
        ----------
        flowid: str
           namespace

        Returns
        -------
        None

        """
        FederationTagNamespace.set_namespace(str(flowid))

    def _create_variable(
        self, name: str, src: typing.Iterable[str], dst: typing.Iterable[str]
    ) -> Variable:
        full_name = f"{self.__module__}.{self.__class__.__name__}.{name}"
        return Variable.get_or_create(
            full_name, lambda: Variable(name=full_name, src=tuple(src), dst=tuple(dst))
        )

    @staticmethod
    def all_parties():
        """
        get all parties

        Returns
        -------
        list
           list of parties

        """
        return get_parties().all_parties

    @staticmethod
    def local_party():
        """
        indicate local party

        Returns
        -------
        Party
           party this program running on

        """
        return get_parties().local_party

Methods

__init__(self, *args) special
Source code in fate_arch/federation/transfer_variable.py
def __init__(self, *args):
    pass
__copy__(self) special
Source code in fate_arch/federation/transfer_variable.py
def __copy__(self):
    return self
__deepcopy__(self, memo) special
Source code in fate_arch/federation/transfer_variable.py
def __deepcopy__(self, memo):
    return self
set_flowid(flowid) staticmethod

set global namespace for federations.

Parameters:

Name Type Description Default
flowid str

namespace

required
Source code in fate_arch/federation/transfer_variable.py
@staticmethod
def set_flowid(flowid):
    """
    set global namespace for federations.

    Parameters
    ----------
    flowid: str
       namespace

    Returns
    -------
    None

    """
    FederationTagNamespace.set_namespace(str(flowid))
all_parties() staticmethod

get all parties

Returns:

Type Description
list

list of parties

Source code in fate_arch/federation/transfer_variable.py
@staticmethod
def all_parties():
    """
    get all parties

    Returns
    -------
    list
       list of parties

    """
    return get_parties().all_parties
local_party() staticmethod

indicate local party

Returns:

Type Description
Party

party this program running on

Source code in fate_arch/federation/transfer_variable.py
@staticmethod
def local_party():
    """
    indicate local party

    Returns
    -------
    Party
       party this program running on

    """
    return get_parties().local_party

Last update: 2021-11-15
Back to top