Computing API

Most of the time, the federatedml’s user does not need to know how to initialize a computing session because fate flow has already cover this for you. Unless, the user is writing unittest, and CTable related functions are involved. Initialize a computing session:

from fate_arch.session import computing_session
# initialize
computing_session.init(work_mode=0, backend=0, session_id="a great session")
# create a table from iterable data
table = computing_session.parallelize(range(100), include_key=False, partition=2)
class computing_session
static init(session_id, work_mode=0, backend=0)

initialize a computing session

Parameters
  • session_id (str) – session id

  • work_mode (int) – work mode, 0 for standalone, 1 for cluster

  • backend (int) – computing backend, 0 for eggroll, 1 for spark

Returns

computing session

Return type

instance of concrete subclass of CSessionABC

static parallelize(data: Iterable, partition: int, include_key: bool, **kwargs) fate_arch.abc._computing.CTableABC

create table from iterable data

Parameters
  • data (Iterable) – data to create table from

  • partition (int) – number of partitions of created table

  • include_key (bool) – True for create table directly from data, False for create table with generated keys start from 0

Returns

a table create from data

Return type

instance of concrete subclass fo CTableABC

static stop()

stop session

After creating a table using computing session, many distributed computing api available

distributed computing

Classes:

CTableABC()

a table of pair-like data supports distributed processing

CSessionABC()

computing session to load/create/clean tables

class CTableABC

a table of pair-like data supports distributed processing

Attributes:

partitions

get the partitions of table

Methods:

save(address, partitions, schema, **kwargs)

save table

collect(**kwargs)

collect data from table

take([n])

take n data from table

first(**kwargs)

take one data from table

count()

number of data in table

map(func)

apply func to each data

mapValues(func)

apply func to each value of data

mapPartitions(func[, use_previous_behavior, …])

apply func to each partition of table

mapReducePartitions(mapper, reducer, **kwargs)

apply mapper to each partition of table and then perform reduce by key operation with reducer

applyPartitions(func)

apply func to each partitions as a single object

flatMap(func)

apply a flat func to each data of table

reduce(func)

reduces all value in pair of table by a binary function func

glom()

coalesces all data within partition into a list

sample(*[, fraction, num, seed])

return a sampled subset of this Table.

filter(func)

returns a new table containing only those keys which satisfy a predicate passed in via func.

join(other, func)

returns intersection of this table and the other table.

union(other[, func])

returns union of this table and the other table.

subtractByKey(other)

returns a new table containing elements only in this table but not in the other table.

abstract property partitions

get the partitions of table

Returns

number of partitions

Return type

int

abstract save(address: fate_arch.abc._address.AddressABC, partitions: int, schema: dict, **kwargs)

save table

Parameters
  • address (AddressABC) – address to save table to

  • partitions (int) – number of partitions to save as

  • schema (dict) – table schema

abstract collect(**kwargs) Generator

collect data from table

Returns

generator of data

Return type

generator

Notes

no order guarantee

abstract take(n=1, **kwargs)

take n data from table

Parameters

n (int) – number of data to take

Returns

a list of n data

Return type

list

Notes

no order guarantee

abstract first(**kwargs)

take one data from table

Returns

a data from table

Return type

object

Notes

no order guarantee

abstract count() int

number of data in table

Returns

number of data

Return type

int

abstract map(func) fate_arch.abc._computing.CTableABC

apply func to each data

Parameters

func (typing.Callable[[object, object], typing.Tuple[object, object]]) – function map (k1, v1) to (k2, v2)

Returns

A new table

Return type

CTableABC

Examples

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([('k1', 1), ('k2', 2), ('k3', 3)], include_key=True, partition=2)
>>> b = a.map(lambda k, v: (k, v**2))
>>> list(b.collect())
[("k1", 1), ("k2", 4), ("k3", 9)]
abstract mapValues(func)

apply func to each value of data

Parameters

func (typing.Callable[[object], object]) – map v1 to v2

Returns

A new table

Return type

CTableABC

Examples

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([('a', ['apple', 'banana', 'lemon']), ('b', ['grapes'])], include_key=True, partition=2)
>>> b = a.mapValues(lambda x: len(x))
>>> list(b.collect())
[('a', 3), ('b', 1)]
abstract mapPartitions(func, use_previous_behavior=True, preserves_partitioning=False)

apply func to each partition of table

Parameters
  • func (typing.Callable[[iter], list]) – accept an iterator of pair, return a list of pair

  • use_previous_behavior (bool) – this parameter is provided for compatible reason, if set True, call this func will call applyPartitions instead

  • preserves_partitioning (bool) – flag indicate whether the func will preserve partition

Returns

a new table

Return type

CTableABC

Examples

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3, 4, 5], include_key=False, partition=2)
>>> def f(iterator):
...     s = 0
...     for k, v in iterator:
...             s += v
...         return [(s, s)]
...
>>> b = a.mapPartitions(f)
>>> list(b.collect())
[(6, 6), (9, 9)]
abstract mapReducePartitions(mapper, reducer, **kwargs)

apply mapper to each partition of table and then perform reduce by key operation with reducer

Parameters
  • mapper (typing.Callable[[iter], list]) – accept an iterator of pair, return a list of pair

  • reducer (typing.Callable[[object, object], object]) – reduce v1, v2 to v3

Returns

a new table

Return type

CTableABC

Examples

>>> from fate_arch.session import computing_session
>>> table = computing_session.parallelize([(1, 2), (2, 3), (3, 4), (4, 5)], include_key=False, partition=2)
>>> def _mapper(it):
...     r = []
...     for k, v in it:
...         r.append((k % 3, v**2))
...         r.append((k % 2, v ** 3))
...     return r
>>> def _reducer(a, b):
...     return a + b
>>> output = table.mapReducePartitions(_mapper, _reducer)
>>> collected = dict(output.collect())
>>> assert collected[0] == 3 ** 3 + 5 ** 3 + 4 ** 2
>>> assert collected[1] == 2 ** 3 + 4 ** 3 + 2 ** 2 + 5 ** 2
>>> assert collected[2] == 3 ** 2
applyPartitions(func)

apply func to each partitions as a single object

Parameters

func (typing.Callable[[iter], object]) – accept a iterator, return a object

Returns

a new table, with each partition contains a single key-value pair

Return type

CTableABC

Examples

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3], partition=3, include_key=False)
>>> def f(it):
...    r = []
...    for k, v in it:
...        r.append(v, v**2, v**3)
...    return r
>>> output = a.applyPartitions(f)
>>> assert (2, 2**2, 2**3) in [v[0] for _, v in output.collect()]
abstract flatMap(func)

apply a flat func to each data of table

Parameters

func (typing.Callable[[object, object], typing.List[object, object]]) – a flat function accept two parameters return a list of pair

Returns

a new table

Return type

CTableABC

Examples

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([(1, 1), (2, 2)], include_key=True, partition=2)
>>> b = a.flatMap(lambda x, y: [(x, y), (x + 10, y ** 2)])
>>> c = list(b.collect())
>>> assert len(c) = 4
>>> assert ((1, 1) in c) and ((2, 2) in c) and ((11, 1) in c) and ((12, 4) in c)
abstract reduce(func)

reduces all value in pair of table by a binary function func

Parameters

func (typing.Callable[[object, object], object]) – binary function reduce two value into one

Notes

func should be associative

Returns

a single object

Return type

object

Examples

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize(range(100), include_key=False, partition=4)
>>> assert a.reduce(lambda x, y: x + y) == sum(range(100))
abstract glom()

coalesces all data within partition into a list

Returns

list containing all coalesced partition and its elements. First element of each tuple is chosen from key of last element of each partition.

Return type

list

Examples

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize(range(5), include_key=False, partition=3).glom().collect()
>>> list(a)
[(2, [(2, 2)]), (3, [(0, 0), (3, 3)]), (4, [(1, 1), (4, 4)])]
abstract sample(*, fraction: Optional[float] = None, num: Optional[int] = None, seed=None)

return a sampled subset of this Table.

Parameters
  • fraction (float) – Expected size of the sample as a fraction of this table’s size without replacement: probability that each element is chosen. Fraction must be [0, 1] with replacement: expected number of times each element is chosen.

  • num (int) – Exact number of the sample from this table’s size

  • seed (int) – Seed of the random number generator. Use current timestamp when None is passed.

Notes

use one of fraction and num, not both

Returns

a new table

Return type

CTableABC

Examples

>>> from fate_arch.session import computing_session
>>> x = computing_session.parallelize(range(100), include_key=False, partition=4)
>>>  6 <= x.sample(fraction=0.1, seed=81).count() <= 14
True
abstract filter(func)

returns a new table containing only those keys which satisfy a predicate passed in via func.

Parameters

func (typing.Callable[[object, object], bool]) – Predicate function returning a boolean.

Returns

A new table containing results.

Return type

CTableABC

Examples

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([0, 1, 2], include_key=False, partition=2)
>>> b = a.filter(lambda k, v : k % 2 == 0)
>>> list(b.collect())
[(0, 0), (2, 2)]
>>> c = a.filter(lambda k, v : v % 2 != 0)
>>> list(c.collect())
[(1, 1)]
abstract join(other, func)

returns intersection of this table and the other table.

function func will be applied to values of keys that exist in both table.

Parameters
  • other (CTableABC) – another table to be operated with.

  • func (typing.Callable[[object, object], object]) – the function applying to values whose key exists in both tables. default using left table’s value.

Returns

a new table

Return type

CTableABC

Examples

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2)        # [(0, 1), (1, 2), (2, 3)]
>>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
>>> c = a.join(b, lambda v1, v2 : v1 + v2)
>>> list(c.collect())
[(1, 3), (2, 5)]
abstract union(other, func=<function CTableABC.<lambda>>)

returns union of this table and the other table.

function func will be applied to values of keys that exist in both table.

Parameters
  • other (CTableABC) – another table to be operated with.

  • func (typing.Callable[[object, object], object]) – The function applying to values whose key exists in both tables. default using left table’s value.

Returns

a new table

Return type

CTableABC

Examples

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2)        # [(0, 1), (1, 2), (2, 3)]
>>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
>>> c = a.union(b, lambda v1, v2 : v1 + v2)
>>> list(c.collect())
[(0, 1), (1, 3), (2, 5), (3, 3)]
abstract subtractByKey(other)

returns a new table containing elements only in this table but not in the other table.

Parameters

other (CTableABC) – Another table to be subtractbykey with.

Returns

A new table

Return type

CTableABC

Examples

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize(range(10), include_key=False, partition=2)
>>> b = computing_session.parallelize(range(5), include_key=False, partition=2)
>>> c = a.subtractByKey(b)
>>> list(c.collect())
[(5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]
class CSessionABC

computing session to load/create/clean tables

Methods:

load(address, partitions, schema, **kwargs)

load a table from given address

parallelize(data, partition, include_key, …)

create table from iterable data

cleanup(name, namespace)

delete table(s)

Attributes:

session_id

get computing session id

abstract load(address: fate_arch.abc._address.AddressABC, partitions, schema: dict, **kwargs) Union[fate_arch.abc._path.PathABC, fate_arch.abc._computing.CTableABC]

load a table from given address

Parameters
  • address (AddressABC) – address to load table from

  • partitions (int) – number of partitions of loaded table

  • schema (dict) – schema associate with this table

Returns

a table in memory

Return type

CTableABC

abstract parallelize(data: collections.abc.Iterable, partition: int, include_key: bool, **kwargs) fate_arch.abc._computing.CTableABC

create table from iterable data

Parameters
  • data (Iterable) – data to create table from

  • partition (int) – number of partitions of created table

  • include_key (bool) – True for create table directly from data, False for create table with generated keys start from 0

Returns

a table create from data

Return type

CTableABC

abstract cleanup(name, namespace)

delete table(s)

Parameters
  • name (str) – table name or wildcard character

  • namespace (str) – namespace

abstract property session_id: str

get computing session id

Returns

computing session id

Return type

str