Computing API¶
Most of the time, the federatedml’s user does not need to know how to initialize a computing session because fate flow has already cover this for you. Unless, the user is writing unittest, and CTable related functions are involved. Initialize a computing session:
from fate_arch.session import computing_session
# initialize
computing_session.init(work_mode=0, backend=0, session_id="a great session")
# create a table from iterable data
table = computing_session.parallelize(range(100), include_key=False, partition=2)
- class computing_session¶
- static init(session_id, work_mode=0, backend=0)¶
initialize a computing session
- Parameters
session_id (str) – session id
work_mode (int) – work mode, 0 for standalone, 1 for cluster
backend (int) – computing backend, 0 for eggroll, 1 for spark
- Returns
computing session
- Return type
instance of concrete subclass of
CSessionABC
- static parallelize(data: Iterable, partition: int, include_key: bool, **kwargs) fate_arch.abc._computing.CTableABC ¶
create table from iterable data
- Parameters
data (Iterable) – data to create table from
partition (int) – number of partitions of created table
include_key (bool) –
True
for create table directly from data,False
for create table with generated keys start from 0
- Returns
a table create from data
- Return type
instance of concrete subclass fo
CTableABC
- static stop()¶
stop session
After creating a table using computing session, many distributed computing api available
distributed computing
Classes:
a table of pair-like data supports distributed processing |
|
computing session to load/create/clean tables |
- class CTableABC¶
a table of pair-like data supports distributed processing
Attributes:
get the partitions of table
Methods:
save
(address, partitions, schema, **kwargs)save table
collect
(**kwargs)collect data from table
take
([n])take
n
data from tablefirst
(**kwargs)take one data from table
count
()number of data in table
map
(func)apply func to each data
mapValues
(func)apply func to each value of data
mapPartitions
(func[, use_previous_behavior, ...])apply
func
to each partition of tablemapReducePartitions
(mapper, reducer, **kwargs)apply
mapper
to each partition of table and then perform reduce by key operation with reducerapplyPartitions
(func)apply
func
to each partitions as a single objectflatMap
(func)apply a flat
func
to each data of tablereduce
(func)reduces all value in pair of table by a binary function func
glom
()coalesces all data within partition into a list
sample
(*[, fraction, num, seed])return a sampled subset of this Table.
filter
(func)returns a new table containing only those keys which satisfy a predicate passed in via
func
.join
(other, func)returns intersection of this table and the other table.
union
(other[, func])returns union of this table and the other table.
subtractByKey
(other)returns a new table containing elements only in this table but not in the other table.
- abstract property partitions¶
get the partitions of table
- Returns
number of partitions
- Return type
int
- abstract save(address: fate_arch.abc._address.AddressABC, partitions: int, schema: dict, **kwargs)¶
save table
- Parameters
address (AddressABC) – address to save table to
partitions (int) – number of partitions to save as
schema (dict) – table schema
- abstract collect(**kwargs) Generator ¶
collect data from table
- Returns
generator of data
- Return type
generator
Notes
no order guarantee
- abstract take(n=1, **kwargs)¶
take
n
data from table- Parameters
n (int) – number of data to take
- Returns
a list of
n
data- Return type
list
Notes
no order guarantee
- abstract first(**kwargs)¶
take one data from table
- Returns
a data from table
- Return type
object
Notes
no order guarantee
- abstract count() int ¶
number of data in table
- Returns
number of data
- Return type
int
- abstract map(func) fate_arch.abc._computing.CTableABC ¶
apply func to each data
- Parameters
func (
typing.Callable[[object, object], typing.Tuple[object, object]]
) – function map (k1, v1) to (k2, v2)- Returns
A new table
- Return type
Examples
>>> from fate_arch.session import computing_session >>> a = computing_session.parallelize([('k1', 1), ('k2', 2), ('k3', 3)], include_key=True, partition=2) >>> b = a.map(lambda k, v: (k, v**2)) >>> list(b.collect()) [("k1", 1), ("k2", 4), ("k3", 9)]
- abstract mapValues(func)¶
apply func to each value of data
- Parameters
func (
typing.Callable[[object], object]
) – map v1 to v2- Returns
A new table
- Return type
Examples
>>> from fate_arch.session import computing_session >>> a = computing_session.parallelize([('a', ['apple', 'banana', 'lemon']), ('b', ['grapes'])], include_key=True, partition=2) >>> b = a.mapValues(lambda x: len(x)) >>> list(b.collect()) [('a', 3), ('b', 1)]
- abstract mapPartitions(func, use_previous_behavior=True, preserves_partitioning=False)¶
apply
func
to each partition of table- Parameters
func (
typing.Callable[[iter], list]
) – accept an iterator of pair, return a list of pairuse_previous_behavior (bool) – this parameter is provided for compatible reason, if set True, call this func will call
applyPartitions
insteadpreserves_partitioning (bool) – flag indicate whether the func will preserve partition
- Returns
a new table
- Return type
Examples
>>> from fate_arch.session import computing_session >>> a = computing_session.parallelize([1, 2, 3, 4, 5], include_key=False, partition=2) >>> def f(iterator): ... s = 0 ... for k, v in iterator: ... s += v ... return [(s, s)] ... >>> b = a.mapPartitions(f) >>> list(b.collect()) [(6, 6), (9, 9)]
- abstract mapReducePartitions(mapper, reducer, **kwargs)¶
apply
mapper
to each partition of table and then perform reduce by key operation with reducer- Parameters
mapper (
typing.Callable[[iter], list]
) – accept an iterator of pair, return a list of pairreducer (
typing.Callable[[object, object], object]
) – reduce v1, v2 to v3
- Returns
a new table
- Return type
Examples
>>> from fate_arch.session import computing_session >>> table = computing_session.parallelize([(1, 2), (2, 3), (3, 4), (4, 5)], include_key=False, partition=2) >>> def _mapper(it): ... r = [] ... for k, v in it: ... r.append((k % 3, v**2)) ... r.append((k % 2, v ** 3)) ... return r >>> def _reducer(a, b): ... return a + b >>> output = table.mapReducePartitions(_mapper, _reducer) >>> collected = dict(output.collect()) >>> assert collected[0] == 3 ** 3 + 5 ** 3 + 4 ** 2 >>> assert collected[1] == 2 ** 3 + 4 ** 3 + 2 ** 2 + 5 ** 2 >>> assert collected[2] == 3 ** 2
- applyPartitions(func)¶
apply
func
to each partitions as a single object- Parameters
func (
typing.Callable[[iter], object]
) – accept a iterator, return a object- Returns
a new table, with each partition contains a single key-value pair
- Return type
Examples
>>> from fate_arch.session import computing_session >>> a = computing_session.parallelize([1, 2, 3], partition=3, include_key=False) >>> def f(it): ... r = [] ... for k, v in it: ... r.append(v, v**2, v**3) ... return r >>> output = a.applyPartitions(f) >>> assert (2, 2**2, 2**3) in [v[0] for _, v in output.collect()]
- abstract flatMap(func)¶
apply a flat
func
to each data of table- Parameters
func (
typing.Callable[[object, object], typing.List[object, object]]
) – a flat function accept two parameters return a list of pair- Returns
a new table
- Return type
Examples
>>> from fate_arch.session import computing_session >>> a = computing_session.parallelize([(1, 1), (2, 2)], include_key=True, partition=2) >>> b = a.flatMap(lambda x, y: [(x, y), (x + 10, y ** 2)]) >>> c = list(b.collect()) >>> assert len(c) = 4 >>> assert ((1, 1) in c) and ((2, 2) in c) and ((11, 1) in c) and ((12, 4) in c)
- abstract reduce(func)¶
reduces all value in pair of table by a binary function func
- Parameters
func (
typing.Callable[[object, object], object]
) – binary function reduce two value into one
Notes
func should be associative
- Returns
a single object
- Return type
object
Examples
>>> from fate_arch.session import computing_session >>> a = computing_session.parallelize(range(100), include_key=False, partition=4) >>> assert a.reduce(lambda x, y: x + y) == sum(range(100))
- abstract glom()¶
coalesces all data within partition into a list
- Returns
list containing all coalesced partition and its elements. First element of each tuple is chosen from key of last element of each partition.
- Return type
list
Examples
>>> from fate_arch.session import computing_session >>> a = computing_session.parallelize(range(5), include_key=False, partition=3).glom().collect() >>> list(a) [(2, [(2, 2)]), (3, [(0, 0), (3, 3)]), (4, [(1, 1), (4, 4)])]
- abstract sample(*, fraction: Optional[float] = None, num: Optional[int] = None, seed=None)¶
return a sampled subset of this Table.
- Parameters
fraction (float) – Expected size of the sample as a fraction of this table’s size without replacement: probability that each element is chosen. Fraction must be [0, 1] with replacement: expected number of times each element is chosen.
num (int) – Exact number of the sample from this table’s size
seed (int) – Seed of the random number generator. Use current timestamp when None is passed.
Notes
use one of
fraction
andnum
, not both- Returns
a new table
- Return type
Examples
>>> from fate_arch.session import computing_session >>> x = computing_session.parallelize(range(100), include_key=False, partition=4) >>> 6 <= x.sample(fraction=0.1, seed=81).count() <= 14 True
- abstract filter(func)¶
returns a new table containing only those keys which satisfy a predicate passed in via
func
.- Parameters
func (Callable[[object, object], bool]) – Predicate function returning a boolean.
- Returns
A new table containing results.
- Return type
Examples
>>> from fate_arch.session import computing_session >>> a = computing_session.parallelize([0, 1, 2], include_key=False, partition=2) >>> b = a.filter(lambda k, v : k % 2 == 0) >>> list(b.collect()) [(0, 0), (2, 2)] >>> c = a.filter(lambda k, v : v % 2 != 0) >>> list(c.collect()) [(1, 1)]
- abstract join(other, func)¶
returns intersection of this table and the other table.
function
func
will be applied to values of keys that exist in both table.- Parameters
other (CTableABC) – another table to be operated with.
func (
typing.Callable[[object, object], object]
) – the function applying to values whose key exists in both tables. default using left table’s value.
- Returns
a new table
- Return type
Examples
>>> from fate_arch.session import computing_session >>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2) # [(0, 1), (1, 2), (2, 3)] >>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2) >>> c = a.join(b, lambda v1, v2 : v1 + v2) >>> list(c.collect()) [(1, 3), (2, 5)]
- abstract union(other, func=<function CTableABC.<lambda>>)¶
returns union of this table and the other table.
function
func
will be applied to values of keys that exist in both table.- Parameters
other (CTableABC) – another table to be operated with.
func (
typing.Callable[[object, object], object]
) – The function applying to values whose key exists in both tables. default using left table’s value.
- Returns
a new table
- Return type
Examples
>>> from fate_arch.session import computing_session >>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2) # [(0, 1), (1, 2), (2, 3)] >>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2) >>> c = a.union(b, lambda v1, v2 : v1 + v2) >>> list(c.collect()) [(0, 1), (1, 3), (2, 5), (3, 3)]
- abstract subtractByKey(other)¶
returns a new table containing elements only in this table but not in the other table.
- Parameters
other (CTableABC) – Another table to be subtractbykey with.
- Returns
A new table
- Return type
Examples
>>> from fate_arch.session import computing_session >>> a = computing_session.parallelize(range(10), include_key=False, partition=2) >>> b = computing_session.parallelize(range(5), include_key=False, partition=2) >>> c = a.subtractByKey(b) >>> list(c.collect()) [(5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]
- class CSessionABC¶
computing session to load/create/clean tables
Methods:
load
(address, partitions, schema, **kwargs)load a table from given address
parallelize
(data, partition, include_key, ...)create table from iterable data
cleanup
(name, namespace)delete table(s)
Attributes:
get computing session id
- abstract load(address: fate_arch.abc._address.AddressABC, partitions, schema: dict, **kwargs) Union[fate_arch.abc._path.PathABC, fate_arch.abc._computing.CTableABC] ¶
load a table from given address
- Parameters
address (AddressABC) – address to load table from
partitions (int) – number of partitions of loaded table
schema (dict) – schema associate with this table
- Returns
a table in memory
- Return type
- abstract parallelize(data: collections.abc.Iterable, partition: int, include_key: bool, **kwargs) fate_arch.abc._computing.CTableABC ¶
create table from iterable data
- Parameters
data (Iterable) – data to create table from
partition (int) – number of partitions of created table
include_key (bool) –
True
for create table directly from data,False
for create table with generated keys start from 0
- Returns
a table create from data
- Return type
- abstract cleanup(name, namespace)¶
delete table(s)
- Parameters
name (str) – table name or wildcard character
namespace (str) – namespace
- abstract property session_id: str¶
get computing session id
- Returns
computing session id
- Return type
str