Skip to content

Computing API

Most of the time, the federatedml's user does not need to know how to initialize a computing session because fate flow has already cover this for you. Unless, the user is writing unittest, and CTable related functions are involved. Initialize a computing session:

from fate_arch.session import computing_session
# initialize
computing_session.init(session_id="a great session")
# create a table from iterable data
table = computing_session.parallelize(range(100), include_key=False, partition=2)

computing session

computing_session

Source code in fate_arch/session/_session.py
class computing_session(object):
    @staticmethod
    def init(session_id, options=None):
        Session(options=options).as_global().init_computing(session_id)

    @staticmethod
    def parallelize(data: typing.Iterable, partition: int, include_key: bool, **kwargs) -> CTableABC:
        return get_computing_session().parallelize(data, partition=partition, include_key=include_key, **kwargs)

    @staticmethod
    def stop():
        get_computing_session().stop()
init(session_id, options=None) staticmethod
Source code in fate_arch/session/_session.py
@staticmethod
def init(session_id, options=None):
    Session(options=options).as_global().init_computing(session_id)
parallelize(data, partition, include_key, **kwargs) staticmethod
Source code in fate_arch/session/_session.py
@staticmethod
def parallelize(data: typing.Iterable, partition: int, include_key: bool, **kwargs) -> CTableABC:
    return get_computing_session().parallelize(data, partition=partition, include_key=include_key, **kwargs)
stop() staticmethod
Source code in fate_arch/session/_session.py
@staticmethod
def stop():
    get_computing_session().stop()

computing table

After creating a table using computing session, many distributed computing api available

CTableABC

a table of pair-like data supports distributed processing

Source code in fate_arch/abc/_computing.py
class CTableABC(metaclass=ABCMeta):
    """
    a table of pair-like data supports distributed processing
    """

    @property
    @abc.abstractmethod
    def engine(self):
        """
        get the engine name of table

        Returns
        -------
        int
           number of partitions
        """
        ...

    @property
    @abc.abstractmethod
    def partitions(self):
        """
        get the partitions of table

        Returns
        -------
        int
           number of partitions
        """
        ...

    @abc.abstractmethod
    def save(self, address: AddressABC, partitions: int, schema: dict, **kwargs):
        """
        save table

        Parameters
        ----------
        address: AddressABC
           address to save table to
        partitions: int
           number of partitions to save as
        schema: dict
           table schema
        """
        ...

    @abc.abstractmethod
    def collect(self, **kwargs) -> typing.Generator:
        """
        collect data from table

        Returns
        -------
        generator
           generator of data

        Notes
        ------
        no order guarantee
        """
        ...

    @abc.abstractmethod
    def take(self, n=1, **kwargs):
        """
        take ``n`` data from table

        Parameters
        ----------
        n: int
          number of data to take

        Returns
        -------
        list
           a list of ``n`` data

        Notes
        ------
        no order guarantee
        """
        ...

    @abc.abstractmethod
    def first(self, **kwargs):
        """
        take one data from table

        Returns
        -------
        object
          a data from table


        Notes
        -------
        no order guarantee
        """
        ...

    @abc.abstractmethod
    def count(self) -> int:
        """
        number of data in table

        Returns
        -------
        int
           number of data
        """
        ...

    @abc.abstractmethod
    def map(self, func) -> 'CTableABC':
        """
        apply `func` to each data

        Parameters
        ----------
        func: ``typing.Callable[[object, object], typing.Tuple[object, object]]``
           function map (k1, v1) to (k2, v2)

        Returns
        -------
        CTableABC
           A new table

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> a = computing_session.parallelize([('k1', 1), ('k2', 2), ('k3', 3)], include_key=True, partition=2)
        >>> b = a.map(lambda k, v: (k, v**2))
        >>> list(b.collect())
        [("k1", 1), ("k2", 4), ("k3", 9)]
        """
        ...

    @abc.abstractmethod
    def mapValues(self, func):
        """
        apply `func` to each value of data

        Parameters
        ----------
        func: ``typing.Callable[[object], object]``
           map v1 to v2

        Returns
        -------
        CTableABC
           A new table

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> a = computing_session.parallelize([('a', ['apple', 'banana', 'lemon']), ('b', ['grapes'])], include_key=True, partition=2)
        >>> b = a.mapValues(lambda x: len(x))
        >>> list(b.collect())
        [('a', 3), ('b', 1)]
        """
        ...

    @abc.abstractmethod
    def mapPartitions(self, func, use_previous_behavior=True, preserves_partitioning=False):
        """
        apply ``func`` to each partition of table

        Parameters
        ----------
        func: ``typing.Callable[[iter], list]``
           accept an iterator of pair, return a list of pair
        use_previous_behavior: bool
           this parameter is provided for compatible reason, if set True, call this func will call ``applyPartitions`` instead
        preserves_partitioning: bool
           flag indicate whether the `func` will preserve partition

        Returns
        -------
        CTableABC
           a new table

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> a = computing_session.parallelize([1, 2, 3, 4, 5], include_key=False, partition=2)
        >>> def f(iterator):
        ...     s = 0
        ... 	for k, v in iterator:
        ... 		s += v
        ...	    return [(s, s)]
        ...
        >>> b = a.mapPartitions(f)
        >>> list(b.collect())
        [(6, 6), (9, 9)]
        """
        ...

    @abc.abstractmethod
    def mapReducePartitions(self, mapper, reducer, **kwargs):
        """
        apply ``mapper`` to each partition of table and then perform reduce by key operation with `reducer`

        Parameters
        ----------
        mapper: ``typing.Callable[[iter], list]``
           accept an iterator of pair, return a list of pair
        reducer: ``typing.Callable[[object, object], object]``
           reduce v1, v2 to v3

        Returns
        -------
        CTableABC
           a new table

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> table = computing_session.parallelize([(1, 2), (2, 3), (3, 4), (4, 5)], include_key=False, partition=2)
        >>> def _mapper(it):
        ...     r = []
        ...     for k, v in it:
        ...         r.append((k % 3, v**2))
        ...         r.append((k % 2, v ** 3))
        ...     return r
        >>> def _reducer(a, b):
        ...     return a + b
        >>> output = table.mapReducePartitions(_mapper, _reducer)
        >>> collected = dict(output.collect())
        >>> assert collected[0] == 3 ** 3 + 5 ** 3 + 4 ** 2
        >>> assert collected[1] == 2 ** 3 + 4 ** 3 + 2 ** 2 + 5 ** 2
        >>> assert collected[2] == 3 ** 2
        """

        ...

    def applyPartitions(self, func):
        """
        apply ``func`` to each partitions as a single object

        Parameters
        ----------
        func: ``typing.Callable[[iter], object]``
           accept a iterator, return a object

        Returns
        -------
        CTableABC
           a new table, with each partition contains a single key-value pair

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> a = computing_session.parallelize([1, 2, 3], partition=3, include_key=False)
        >>> def f(it):
        ...    r = []
        ...    for k, v in it:
        ...        r.append(v, v**2, v**3)
        ...    return r
        >>> output = a.applyPartitions(f)
        >>> assert (2, 2**2, 2**3) in [v[0] for _, v in output.collect()]
        """
        ...

    @abc.abstractmethod
    def flatMap(self, func):
        """
        apply a flat ``func`` to each data of table

        Parameters
        ----------
        func: ``typing.Callable[[object, object], typing.List[object, object]]``
           a flat function accept two parameters return a list of pair

        Returns
        -------
        CTableABC
           a new table

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> a = computing_session.parallelize([(1, 1), (2, 2)], include_key=True, partition=2)
        >>> b = a.flatMap(lambda x, y: [(x, y), (x + 10, y ** 2)])
        >>> c = list(b.collect())
        >>> assert len(c) = 4
        >>> assert ((1, 1) in c) and ((2, 2) in c) and ((11, 1) in c) and ((12, 4) in c)
        """
        ...

    @abc.abstractmethod
    def reduce(self, func):
        """
        reduces all value in pair of table by a binary function `func`

        Parameters
        ----------
        func: typing.Callable[[object, object], object]
           binary function reduce two value into one

        Returns
        -------
        object
           a single object



        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> a = computing_session.parallelize(range(100), include_key=False, partition=4)
        >>> assert a.reduce(lambda x, y: x + y) == sum(range(100))

        Notes
        ------
        `func` should be associative
        """
        ...

    @abc.abstractmethod
    def glom(self):
        """
        coalesces all data within partition into a list

        Returns
        -------
        list
           list containing all coalesced partition and its elements.
           First element of each tuple is chosen from key of last element of each partition.

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> a = computing_session.parallelize(range(5), include_key=False, partition=3).glom().collect()
        >>> list(a)
        [(2, [(2, 2)]), (3, [(0, 0), (3, 3)]), (4, [(1, 1), (4, 4)])]
        """
        ...

    @abc.abstractmethod
    def sample(self, *, fraction: typing.Optional[float] = None, num: typing.Optional[int] = None, seed=None):
        """
        return a sampled subset of this Table.
        Parameters
        ----------
        fraction: float
          Expected size of the sample as a fraction of this table's size
          without replacement: probability that each element is chosen.
          Fraction must be [0, 1] with replacement: expected number of times each element is chosen.
        num: int
          Exact number of the sample from this table's size
        seed: int
          Seed of the random number generator. Use current timestamp when `None` is passed.

        Returns
        -------
        CTableABC
           a new table

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> x = computing_session.parallelize(range(100), include_key=False, partition=4)
        >>> 6 <= x.sample(fraction=0.1, seed=81).count() <= 14
        True

        Notes
        -------
        use one of ``fraction`` and ``num``, not both

        """
        ...

    @abc.abstractmethod
    def filter(self, func):
        """
        returns a new table containing only those keys which satisfy a predicate passed in via ``func``.

        Parameters
        ----------
        func: typing.Callable[[object, object], bool]
           Predicate function returning a boolean.

        Returns
        -------
        CTableABC
           A new table containing results.

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> a = computing_session.parallelize([0, 1, 2], include_key=False, partition=2)
        >>> b = a.filter(lambda k, v : k % 2 == 0)
        >>> list(b.collect())
        [(0, 0), (2, 2)]
        >>> c = a.filter(lambda k, v : v % 2 != 0)
        >>> list(c.collect())
        [(1, 1)]
        """
        ...

    @abc.abstractmethod
    def join(self, other, func):
        """
        returns intersection of this table and the other table.

        function ``func`` will be applied to values of keys that exist in both table.

        Parameters
        ----------
        other: CTableABC
          another table to be operated with.
        func: ``typing.Callable[[object, object], object]``
          the function applying to values whose key exists in both tables.
          default using left table's value.

        Returns
        -------
        CTableABC
          a new table

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2)	# [(0, 1), (1, 2), (2, 3)]
        >>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
        >>> c = a.join(b, lambda v1, v2 : v1 + v2)
        >>> list(c.collect())
        [(1, 3), (2, 5)]
        """
        ...

    @abc.abstractmethod
    def union(self, other, func=lambda v1, v2: v1):
        """
        returns union of this table and the other table.

        function ``func`` will be applied to values of keys that exist in both table.

        Parameters
        ----------
        other: CTableABC
          another table to be operated with.
        func: ``typing.Callable[[object, object], object]``
          The function applying to values whose key exists in both tables.
          default using left table's value.

        Returns
        -------
        CTableABC
          a new table

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2)	# [(0, 1), (1, 2), (2, 3)]
        >>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
        >>> c = a.union(b, lambda v1, v2 : v1 + v2)
        >>> list(c.collect())
        [(0, 1), (1, 3), (2, 5), (3, 3)]
        """
        ...

    @abc.abstractmethod
    def subtractByKey(self, other):
        """
        returns a new table containing elements only in this table but not in the other table.

        Parameters
        ----------
        other: CTableABC
          Another table to be subtractbykey with.

        Returns
        -------
        CTableABC
          A new table

        Examples
        --------
        >>> from fate_arch.session import computing_session
        >>> a = computing_session.parallelize(range(10), include_key=False, partition=2)
        >>> b = computing_session.parallelize(range(5), include_key=False, partition=2)
        >>> c = a.subtractByKey(b)
        >>> list(c.collect())
        [(5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]
        """
        ...

    @property
    def schema(self):
        if not hasattr(self, "_schema"):
            setattr(self, "_schema", {})
        return getattr(self, "_schema")

    @schema.setter
    def schema(self, value):
        setattr(self, "_schema", value)

Attributes

engine property readonly

get the engine name of table

Returns:

Type Description
int

number of partitions

partitions property readonly

get the partitions of table

Returns:

Type Description
int

number of partitions

schema property writable

Methods

save(self, address, partitions, schema, **kwargs)

save table

Parameters:

Name Type Description Default
address AddressABC

address to save table to

required
partitions int

number of partitions to save as

required
schema dict

table schema

required
Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def save(self, address: AddressABC, partitions: int, schema: dict, **kwargs):
    """
    save table

    Parameters
    ----------
    address: AddressABC
       address to save table to
    partitions: int
       number of partitions to save as
    schema: dict
       table schema
    """
    ...
collect(self, **kwargs)

collect data from table

Returns:

Type Description
Generator

generator of data

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def collect(self, **kwargs) -> typing.Generator:
    """
    collect data from table

    Returns
    -------
    generator
       generator of data

    Notes
    ------
    no order guarantee
    """
    ...
take(self, n=1, **kwargs)

take n data from table

Parameters:

Name Type Description Default
n int

number of data to take

1

Returns:

Type Description
list

a list of n data

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def take(self, n=1, **kwargs):
    """
    take ``n`` data from table

    Parameters
    ----------
    n: int
      number of data to take

    Returns
    -------
    list
       a list of ``n`` data

    Notes
    ------
    no order guarantee
    """
    ...
first(self, **kwargs)

take one data from table

Returns:

Type Description
object

a data from table

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def first(self, **kwargs):
    """
    take one data from table

    Returns
    -------
    object
      a data from table


    Notes
    -------
    no order guarantee
    """
    ...
count(self)

number of data in table

Returns:

Type Description
int

number of data

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def count(self) -> int:
    """
    number of data in table

    Returns
    -------
    int
       number of data
    """
    ...
map(self, func)

apply func to each data

Parameters:

Name Type Description Default
func ``Callable[[object, object], Tuple[object, object]]``

function map (k1, v1) to (k2, v2)

required

Examples:

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([('k1', 1), ('k2', 2), ('k3', 3)], include_key=True, partition=2)
>>> b = a.map(lambda k, v: (k, v**2))
>>> list(b.collect())
[("k1", 1), ("k2", 4), ("k3", 9)]

Returns:

Type Description
CTableABC

A new table

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def map(self, func) -> 'CTableABC':
    """
    apply `func` to each data

    Parameters
    ----------
    func: ``typing.Callable[[object, object], typing.Tuple[object, object]]``
       function map (k1, v1) to (k2, v2)

    Returns
    -------
    CTableABC
       A new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([('k1', 1), ('k2', 2), ('k3', 3)], include_key=True, partition=2)
    >>> b = a.map(lambda k, v: (k, v**2))
    >>> list(b.collect())
    [("k1", 1), ("k2", 4), ("k3", 9)]
    """
    ...
mapValues(self, func)

apply func to each value of data

Parameters:

Name Type Description Default
func ``Callable[[object], object]``

map v1 to v2

required

Examples:

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([('a', ['apple', 'banana', 'lemon']), ('b', ['grapes'])], include_key=True, partition=2)
>>> b = a.mapValues(lambda x: len(x))
>>> list(b.collect())
[('a', 3), ('b', 1)]

Returns:

Type Description
CTableABC

A new table

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def mapValues(self, func):
    """
    apply `func` to each value of data

    Parameters
    ----------
    func: ``typing.Callable[[object], object]``
       map v1 to v2

    Returns
    -------
    CTableABC
       A new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([('a', ['apple', 'banana', 'lemon']), ('b', ['grapes'])], include_key=True, partition=2)
    >>> b = a.mapValues(lambda x: len(x))
    >>> list(b.collect())
    [('a', 3), ('b', 1)]
    """
    ...
mapPartitions(self, func, use_previous_behavior=True, preserves_partitioning=False)

apply func to each partition of table

Parameters:

Name Type Description Default
func ``Callable[[iter], list]``

accept an iterator of pair, return a list of pair

required
use_previous_behavior bool

this parameter is provided for compatible reason, if set True, call this func will call applyPartitions instead

True
preserves_partitioning bool

flag indicate whether the func will preserve partition

False

Examples:

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3, 4, 5], include_key=False, partition=2)
>>> def f(iterator):
...     s = 0
...     for k, v in iterator:
...             s += v
...         return [(s, s)]
...
>>> b = a.mapPartitions(f)
>>> list(b.collect())
[(6, 6), (9, 9)]

Returns:

Type Description
CTableABC

a new table

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def mapPartitions(self, func, use_previous_behavior=True, preserves_partitioning=False):
    """
    apply ``func`` to each partition of table

    Parameters
    ----------
    func: ``typing.Callable[[iter], list]``
       accept an iterator of pair, return a list of pair
    use_previous_behavior: bool
       this parameter is provided for compatible reason, if set True, call this func will call ``applyPartitions`` instead
    preserves_partitioning: bool
       flag indicate whether the `func` will preserve partition

    Returns
    -------
    CTableABC
       a new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([1, 2, 3, 4, 5], include_key=False, partition=2)
    >>> def f(iterator):
    ...     s = 0
    ... 	for k, v in iterator:
    ... 		s += v
    ...	    return [(s, s)]
    ...
    >>> b = a.mapPartitions(f)
    >>> list(b.collect())
    [(6, 6), (9, 9)]
    """
    ...
mapReducePartitions(self, mapper, reducer, **kwargs)

apply mapper to each partition of table and then perform reduce by key operation with reducer

Parameters:

Name Type Description Default
mapper ``Callable[[iter], list]``

accept an iterator of pair, return a list of pair

required
reducer ``Callable[[object, object], object]``

reduce v1, v2 to v3

required

Examples:

>>> from fate_arch.session import computing_session
>>> table = computing_session.parallelize([(1, 2), (2, 3), (3, 4), (4, 5)], include_key=False, partition=2)
>>> def _mapper(it):
...     r = []
...     for k, v in it:
...         r.append((k % 3, v**2))
...         r.append((k % 2, v ** 3))
...     return r
>>> def _reducer(a, b):
...     return a + b
>>> output = table.mapReducePartitions(_mapper, _reducer)
>>> collected = dict(output.collect())
>>> assert collected[0] == 3 ** 3 + 5 ** 3 + 4 ** 2
>>> assert collected[1] == 2 ** 3 + 4 ** 3 + 2 ** 2 + 5 ** 2
>>> assert collected[2] == 3 ** 2

Returns:

Type Description
CTableABC

a new table

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def mapReducePartitions(self, mapper, reducer, **kwargs):
    """
    apply ``mapper`` to each partition of table and then perform reduce by key operation with `reducer`

    Parameters
    ----------
    mapper: ``typing.Callable[[iter], list]``
       accept an iterator of pair, return a list of pair
    reducer: ``typing.Callable[[object, object], object]``
       reduce v1, v2 to v3

    Returns
    -------
    CTableABC
       a new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> table = computing_session.parallelize([(1, 2), (2, 3), (3, 4), (4, 5)], include_key=False, partition=2)
    >>> def _mapper(it):
    ...     r = []
    ...     for k, v in it:
    ...         r.append((k % 3, v**2))
    ...         r.append((k % 2, v ** 3))
    ...     return r
    >>> def _reducer(a, b):
    ...     return a + b
    >>> output = table.mapReducePartitions(_mapper, _reducer)
    >>> collected = dict(output.collect())
    >>> assert collected[0] == 3 ** 3 + 5 ** 3 + 4 ** 2
    >>> assert collected[1] == 2 ** 3 + 4 ** 3 + 2 ** 2 + 5 ** 2
    >>> assert collected[2] == 3 ** 2
    """

    ...
applyPartitions(self, func)

apply func to each partitions as a single object

Parameters:

Name Type Description Default
func ``Callable[[iter], object]``

accept a iterator, return a object

required

Examples:

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3], partition=3, include_key=False)
>>> def f(it):
...    r = []
...    for k, v in it:
...        r.append(v, v**2, v**3)
...    return r
>>> output = a.applyPartitions(f)
>>> assert (2, 2**2, 2**3) in [v[0] for _, v in output.collect()]

Returns:

Type Description
CTableABC

a new table, with each partition contains a single key-value pair

Source code in fate_arch/abc/_computing.py
def applyPartitions(self, func):
    """
    apply ``func`` to each partitions as a single object

    Parameters
    ----------
    func: ``typing.Callable[[iter], object]``
       accept a iterator, return a object

    Returns
    -------
    CTableABC
       a new table, with each partition contains a single key-value pair

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([1, 2, 3], partition=3, include_key=False)
    >>> def f(it):
    ...    r = []
    ...    for k, v in it:
    ...        r.append(v, v**2, v**3)
    ...    return r
    >>> output = a.applyPartitions(f)
    >>> assert (2, 2**2, 2**3) in [v[0] for _, v in output.collect()]
    """
    ...
flatMap(self, func)

apply a flat func to each data of table

Parameters:

Name Type Description Default
func ``Callable[[object, object], List[object, object]]``

a flat function accept two parameters return a list of pair

required

Examples:

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([(1, 1), (2, 2)], include_key=True, partition=2)
>>> b = a.flatMap(lambda x, y: [(x, y), (x + 10, y ** 2)])
>>> c = list(b.collect())
>>> assert len(c) = 4
>>> assert ((1, 1) in c) and ((2, 2) in c) and ((11, 1) in c) and ((12, 4) in c)

Returns:

Type Description
CTableABC

a new table

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def flatMap(self, func):
    """
    apply a flat ``func`` to each data of table

    Parameters
    ----------
    func: ``typing.Callable[[object, object], typing.List[object, object]]``
       a flat function accept two parameters return a list of pair

    Returns
    -------
    CTableABC
       a new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([(1, 1), (2, 2)], include_key=True, partition=2)
    >>> b = a.flatMap(lambda x, y: [(x, y), (x + 10, y ** 2)])
    >>> c = list(b.collect())
    >>> assert len(c) = 4
    >>> assert ((1, 1) in c) and ((2, 2) in c) and ((11, 1) in c) and ((12, 4) in c)
    """
    ...
reduce(self, func)

reduces all value in pair of table by a binary function func

Parameters:

Name Type Description Default
func Callable[[object, object], object]

binary function reduce two value into one

required

Examples:

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize(range(100), include_key=False, partition=4)
>>> assert a.reduce(lambda x, y: x + y) == sum(range(100))
Notes

func should be associative

Returns:

Type Description
object

a single object

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def reduce(self, func):
    """
    reduces all value in pair of table by a binary function `func`

    Parameters
    ----------
    func: typing.Callable[[object, object], object]
       binary function reduce two value into one

    Returns
    -------
    object
       a single object



    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize(range(100), include_key=False, partition=4)
    >>> assert a.reduce(lambda x, y: x + y) == sum(range(100))

    Notes
    ------
    `func` should be associative
    """
    ...
glom(self)

coalesces all data within partition into a list

Examples:

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize(range(5), include_key=False, partition=3).glom().collect()
>>> list(a)
[(2, [(2, 2)]), (3, [(0, 0), (3, 3)]), (4, [(1, 1), (4, 4)])]

Returns:

Type Description
list

list containing all coalesced partition and its elements. First element of each tuple is chosen from key of last element of each partition.

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def glom(self):
    """
    coalesces all data within partition into a list

    Returns
    -------
    list
       list containing all coalesced partition and its elements.
       First element of each tuple is chosen from key of last element of each partition.

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize(range(5), include_key=False, partition=3).glom().collect()
    >>> list(a)
    [(2, [(2, 2)]), (3, [(0, 0), (3, 3)]), (4, [(1, 1), (4, 4)])]
    """
    ...
sample(self, *, fraction=None, num=None, seed=None)

return a sampled subset of this Table.

Parameters:

Name Type Description Default
fraction Optional[float]

Expected size of the sample as a fraction of this table's size without replacement: probability that each element is chosen. Fraction must be [0, 1] with replacement: expected number of times each element is chosen.

None
num Optional[int]

Exact number of the sample from this table's size

None
seed int

Seed of the random number generator. Use current timestamp when None is passed.

None

Examples:

>>> from fate_arch.session import computing_session
>>> x = computing_session.parallelize(range(100), include_key=False, partition=4)
>>> 6 <= x.sample(fraction=0.1, seed=81).count() <= 14
True
Notes

use one of fraction and num, not both

Returns:

Type Description
CTableABC

a new table

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def sample(self, *, fraction: typing.Optional[float] = None, num: typing.Optional[int] = None, seed=None):
    """
    return a sampled subset of this Table.
    Parameters
    ----------
    fraction: float
      Expected size of the sample as a fraction of this table's size
      without replacement: probability that each element is chosen.
      Fraction must be [0, 1] with replacement: expected number of times each element is chosen.
    num: int
      Exact number of the sample from this table's size
    seed: int
      Seed of the random number generator. Use current timestamp when `None` is passed.

    Returns
    -------
    CTableABC
       a new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> x = computing_session.parallelize(range(100), include_key=False, partition=4)
    >>> 6 <= x.sample(fraction=0.1, seed=81).count() <= 14
    True

    Notes
    -------
    use one of ``fraction`` and ``num``, not both

    """
    ...
filter(self, func)

returns a new table containing only those keys which satisfy a predicate passed in via func.

Parameters:

Name Type Description Default
func Callable[[object, object], bool]

Predicate function returning a boolean.

required

Examples:

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([0, 1, 2], include_key=False, partition=2)
>>> b = a.filter(lambda k, v : k % 2 == 0)
>>> list(b.collect())
[(0, 0), (2, 2)]
>>> c = a.filter(lambda k, v : v % 2 != 0)
>>> list(c.collect())
[(1, 1)]

Returns:

Type Description
CTableABC

A new table containing results.

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def filter(self, func):
    """
    returns a new table containing only those keys which satisfy a predicate passed in via ``func``.

    Parameters
    ----------
    func: typing.Callable[[object, object], bool]
       Predicate function returning a boolean.

    Returns
    -------
    CTableABC
       A new table containing results.

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([0, 1, 2], include_key=False, partition=2)
    >>> b = a.filter(lambda k, v : k % 2 == 0)
    >>> list(b.collect())
    [(0, 0), (2, 2)]
    >>> c = a.filter(lambda k, v : v % 2 != 0)
    >>> list(c.collect())
    [(1, 1)]
    """
    ...
join(self, other, func)

returns intersection of this table and the other table.

function func will be applied to values of keys that exist in both table.

Parameters:

Name Type Description Default
other CTableABC

another table to be operated with.

required
func ``Callable[[object, object], object]``

the function applying to values whose key exists in both tables. default using left table's value.

'using'

Examples:

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2)        # [(0, 1), (1, 2), (2, 3)]
>>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
>>> c = a.join(b, lambda v1, v2 : v1 + v2)
>>> list(c.collect())
[(1, 3), (2, 5)]

Returns:

Type Description
CTableABC

a new table

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def join(self, other, func):
    """
    returns intersection of this table and the other table.

    function ``func`` will be applied to values of keys that exist in both table.

    Parameters
    ----------
    other: CTableABC
      another table to be operated with.
    func: ``typing.Callable[[object, object], object]``
      the function applying to values whose key exists in both tables.
      default using left table's value.

    Returns
    -------
    CTableABC
      a new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2)	# [(0, 1), (1, 2), (2, 3)]
    >>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
    >>> c = a.join(b, lambda v1, v2 : v1 + v2)
    >>> list(c.collect())
    [(1, 3), (2, 5)]
    """
    ...
union(self, other, func=<function CTableABC.<lambda> at 0x7f275986ab00>)

returns union of this table and the other table.

function func will be applied to values of keys that exist in both table.

Parameters:

Name Type Description Default
other CTableABC

another table to be operated with.

required
func ``Callable[[object, object], object]``

The function applying to values whose key exists in both tables. default using left table's value.

<function CTableABC.<lambda> at 0x7f275986ab00>

Examples:

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2)        # [(0, 1), (1, 2), (2, 3)]
>>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
>>> c = a.union(b, lambda v1, v2 : v1 + v2)
>>> list(c.collect())
[(0, 1), (1, 3), (2, 5), (3, 3)]

Returns:

Type Description
CTableABC

a new table

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def union(self, other, func=lambda v1, v2: v1):
    """
    returns union of this table and the other table.

    function ``func`` will be applied to values of keys that exist in both table.

    Parameters
    ----------
    other: CTableABC
      another table to be operated with.
    func: ``typing.Callable[[object, object], object]``
      The function applying to values whose key exists in both tables.
      default using left table's value.

    Returns
    -------
    CTableABC
      a new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2)	# [(0, 1), (1, 2), (2, 3)]
    >>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
    >>> c = a.union(b, lambda v1, v2 : v1 + v2)
    >>> list(c.collect())
    [(0, 1), (1, 3), (2, 5), (3, 3)]
    """
    ...
subtractByKey(self, other)

returns a new table containing elements only in this table but not in the other table.

Parameters:

Name Type Description Default
other CTableABC

Another table to be subtractbykey with.

required

Examples:

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize(range(10), include_key=False, partition=2)
>>> b = computing_session.parallelize(range(5), include_key=False, partition=2)
>>> c = a.subtractByKey(b)
>>> list(c.collect())
[(5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]

Returns:

Type Description
CTableABC

A new table

Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def subtractByKey(self, other):
    """
    returns a new table containing elements only in this table but not in the other table.

    Parameters
    ----------
    other: CTableABC
      Another table to be subtractbykey with.

    Returns
    -------
    CTableABC
      A new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize(range(10), include_key=False, partition=2)
    >>> b = computing_session.parallelize(range(5), include_key=False, partition=2)
    >>> c = a.subtractByKey(b)
    >>> list(c.collect())
    [(5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]
    """
    ...

Last update: 2021-11-15
Back to top