Computing API¶
Most of the time, the federatedml’s user does not need to know how to initialize a computing session because fate flow has already cover this for you. Unless, the user is writing unittest, and CTable related functions are involved. Initialize a computing session:
from fate_arch.session import computing_session
# initialize
computing_session.init(work_mode=0, backend=0, session_id="a great session")
# create a table from iterable data
table = computing_session.parallelize(range(100), include_key=False, partition=2)
-
class
computing_session
¶ -
static
init
(session_id, work_mode=0, backend=0)¶ initialize a computing session
- Parameters
session_id (str) – session id
work_mode (int) – work mode, 0 for standalone, 1 for cluster
backend (int) – computing backend, 0 for eggroll, 1 for spark
- Returns
computing session
- Return type
instance of concrete subclass of
CSessionABC
-
static
parallelize
(data: Iterable, partition: int, include_key: bool, **kwargs) → fate_arch.abc._computing.CTableABC¶ create table from iterable data
- Parameters
data (Iterable) – data to create table from
partition (int) – number of partitions of created table
include_key (bool) –
True
for create table directly from data,False
for create table with generated keys start from 0
- Returns
a table create from data
- Return type
instance of concrete subclass fo
CTableABC
-
static
stop
()¶ stop session
-
static
After creating a table using computing session, many distributed computing api available
distributed computing
Classes:
a table of pair-like data supports distributed processing |
|
computing session to load/create/clean tables |
-
class
CTableABC
¶ a table of pair-like data supports distributed processing
Methods:
applyPartitions
(func)apply
func
to each partitions as a single objectcollect
(**kwargs)collect data from table
count
()number of data in table
filter
(func)returns a new table containing only those keys which satisfy a predicate passed in via
func
.first
(**kwargs)take one data from table
flatMap
(func)apply a flat
func
to each data of table :param func: a flat function accept two parameters return a list of pair :type func:typing.Callable[[object, object], typing.List[object, object]]
glom
()coalesces all data within partition into a list
join
(other, func)returns union of this table and the other table.
map
(func)apply func to each data
mapPartitions
(func[, use_previous_behavior, …])apply
func
to each partition of tablemapReducePartitions
(mapper, reducer, **kwargs)apply
mapper
to each partition of table and then perform reduce by key operation with reducermapValues
(func)apply func to each value of data
reduce
(func)reduces all value in pair of table by a binary function func
sample
(*[, fraction, num, seed])return a sampled subset of this Table.
save
(address, partitions, schema, **kwargs)save table
subtractByKey
(other)returns a new table containing elements only in this table but not in the other table.
take
([n])take
n
data from table :param n: number of data to take :type n: intunion
(other[, func])returns union of this table and the other table.
Attributes:
get the partitions of table
-
abstract property
partitions
¶ get the partitions of table
- Returns
number of partitions
- Return type
int
-
abstract
save
(address: fate_arch.abc._address.AddressABC, partitions: int, schema: dict, **kwargs)¶ save table
- Parameters
address (AddressABC) – address to save table to
partitions (int) – number of partitions to save as
schema (dict) – table schema
-
abstract
collect
(**kwargs) → Generator¶ collect data from table
- Returns
generator of data
- Return type
generator
Notes
no order guarantee
-
abstract
take
(n=1, **kwargs)¶ take
n
data from table :param n: number of data to take :type n: int- Returns
a list of
n
data- Return type
list
-
abstract
first
(**kwargs)¶ take one data from table
- Returns
a data from table
- Return type
object
-
abstract
count
() → int¶ number of data in table
- Returns
number of data
- Return type
int
-
abstract
map
(func) → fate_arch.abc._computing.CTableABC¶ apply func to each data
- Parameters
func (
typing.Callable[[object, object], typing.Tuple[object, object]]
) – function map (k1, v1) to (k2, v2)- Returns
A new table
- Return type
Examples
>>> from fate_arch.session import computing_session >>> a = computing_session.parallelize([('k1', 1), ('k2', 2), ('k3', 3)], include_key=True, partition=2) >>> b = a.map(lambda k, v: (k, v**2)) >>> list(b.collect()) [("k1", 1), ("k2", 4), ("k3", 9)]
-
abstract
mapValues
(func)¶ apply func to each value of data
- Parameters
func (
typing.Callable[[object], object]
) – map v1 to v2- Returns
A new table
- Return type
Examples
>>> from fate_arch.session import computing_session >>> a = computing_session.parallelize([('a', ['apple', 'banana', 'lemon']), ('b', ['grapes'])], include_key=True, partition=2) >>> b = a.mapValues(lambda x: len(x)) >>> list(b.collect()) [('a', 3), ('b', 1)]
-
abstract
mapPartitions
(func, use_previous_behavior=True, preserves_partitioning=False)¶ apply
func
to each partition of table- Parameters
func (
typing.Callable[[iter], list]
) – accept an iterator of pair, return a list of pairuse_previous_behavior (bool) – this parameter is provided for compatible reason, if set True, call this func will call
applyPartitions
insteadpreserves_partitioning (bool) – flag indicate whether the func will preserve partition
- Returns
a new table
- Return type
Examples
>>> from fate_arch.session import computing_session >>> a = computing_session.parallelize([1, 2, 3, 4, 5], include_key=False, partition=2) >>> def f(iterator): ... s = 0 ... for k, v in iterator: ... s += v ... return [(s, s)] ... >>> b = a.mapPartitions(f) >>> list(b.collect()) [(6, 6), (9, 9)]
-
abstract
mapReducePartitions
(mapper, reducer, **kwargs)¶ apply
mapper
to each partition of table and then perform reduce by key operation with reducer- Parameters
mapper (
typing.Callable[[iter], list]
) – accept an iterator of pair, return a list of pairreducer (
typing.Callable[[object, object], object]
) – reduce v1, v2 to v3
- Returns
a new table
- Return type
Examples
>>> from fate_arch.session import computing_session >>> table = computing_session.parallelize([(1, 2), (2, 3), (3, 4), (4, 5)], include_key=False, partition=2) >>> def _mapper(it): ... r = [] ... for k, v in it: ... r.append((k % 3, v**2)) ... r.append((k % 2, v ** 3)) ... return r >>> def _reducer(a, b): ... return a + b >>> output = table.mapReducePartitions(_mapper, _reducer) >>> collected = dict(output.collect()) >>> assert collected[0] == 3 ** 3 + 5 ** 3 + 4 ** 2 >>> assert collected[1] == 2 ** 3 + 4 ** 3 + 2 ** 2 + 5 ** 2 >>> assert collected[2] == 3 ** 2
-
applyPartitions
(func)¶ apply
func
to each partitions as a single object- Parameters
func (
typing.Callable[[iter], object]
) – accept a iterator, return a object- Returns
a new table, with each partition contains a single key-value pair
- Return type
Examples
>>> from fate_arch.session import computing_session >>> a = computing_session.parallelize([1, 2, 3], partition=3, include_key=False) >>> def f(it): ... r = [] ... for k, v in it: ... r.append(v, v**2, v**3) ... return r >>> output = a.applyPartitions(f) >>> assert (2, 2**2, 2**3) in [v[0] for _, v in output.collect()]
-
abstract
flatMap
(func)¶ apply a flat
func
to each data of table :param func: a flat function accept two parameters return a list of pair :type func:typing.Callable[[object, object], typing.List[object, object]]
- Returns
a new table
- Return type
Examples
>>> from fate_arch.session import computing_session >>> a = computing_session.parallelize([(1, 1), (2, 2)], include_key=True, partition=2) >>> b = a.flatMap(lambda x, y: [(x, y), (x + 10, y ** 2)]) >>> c = list(b.collect()) >>> assert len(c) = 4 >>> assert ((1, 1) in c) and ((2, 2) in c) and ((11, 1) in c) and ((12, 4) in c)
-
abstract
reduce
(func)¶ reduces all value in pair of table by a binary function func
- Parameters
func (
typing.Callable[[object, object], object]
) – binary function reduce two value into one
Notes
func should be associative
- Returns
a single object
- Return type
object
Examples
>>> from fate_arch.session import computing_session >>> a = computing_session.parallelize(range(100), include_key=False, partition=4) >>> assert a.reduce(lambda x, y: x + y) == sum(range(100))
-
abstract
glom
()¶ coalesces all data within partition into a list
- Returns
list containing all coalesced partition and its elements. First element of each tuple is chosen from key of last element of each partition.
- Return type
list
Examples
>>> from fate_arch.session import computing_session >>> a = computing_session.parallelize(range(5), include_key=False, partition=3).glom().collect() >>> list(a) [(2, [(2, 2)]), (3, [(0, 0), (3, 3)]), (4, [(1, 1), (4, 4)])]
-
abstract
sample
(*, fraction: Optional[float] = None, num: Optional[int] = None, seed=None)¶ return a sampled subset of this Table.
- Parameters
fraction (float) – Expected size of the sample as a fraction of this table’s size without replacement: probability that each element is chosen. Fraction must be [0, 1] with replacement: expected number of times each element is chosen.
num (int) – Exact number of the sample from this table’s size
seed (int) – Seed of the random number generator. Use current timestamp when None is passed.
Notes
use one of
fraction
andnum
, not both- Returns
a new table
- Return type
Examples
>>> from fate_arch.session import computing_session >>> x = computing_session.parallelize(range(100), include_key=False, partition=4) >>> 6 <= x.sample(fraction=0.1, seed=81).count() <= 14 True
-
abstract
filter
(func)¶ returns a new table containing only those keys which satisfy a predicate passed in via
func
. :param func: Predicate function returning a boolean. :type func: typing.Callable[[object, object], bool]- Returns
A new table containing results.
- Return type
Examples
>>> from fate_arch.session import computing_session >>> a = computing_session.parallelize([0, 1, 2], include_key=False, partition=2) >>> b = a.filter(lambda k, v : k % 2 == 0) >>> list(b.collect()) [(0, 0), (2, 2)] >>> c = a.filter(lambda k, v : v % 2 != 0) >>> list(c.collect()) [(1, 1)]
-
abstract
join
(other, func)¶ returns union of this table and the other table.
function
func
will be applied to values of keys that exist in both table.- Parameters
other (CTableABC) – another table to be operated with.
func (
typing.Callable[[object, object], object]
) – the function applying to values whose key exists in both tables. default using left table’s value.
- Returns
a new table
- Return type
Examples
>>> from fate_arch.session import computing_session >>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2) # [(0, 1), (1, 2), (2, 3)] >>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=False, partition=2) >>> c = a.union(b, lambda v1, v2 : v1 + v2) >>> list(c.collect()) [(0, 1), (1, 3), (2, 5), (3, 3)]
-
abstract
union
(other, func=<function CTableABC.<lambda>>)¶ returns union of this table and the other table.
function
func
will be applied to values of keys that exist in both table.- Parameters
other (CTableABC) – another table to be operated with.
func (
typing.Callable[[object, object], object]
) – The function applying to values whose key exists in both tables. default using left table’s value.
- Returns
a new table
- Return type
Examples
>>> from fate_arch.session import computing_session >>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2) # [(0, 1), (1, 2), (2, 3)] >>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=False, partition=2) >>> c = a.union(b, lambda v1, v2 : v1 + v2) >>> list(c.collect()) [(0, 1), (1, 3), (2, 5), (3, 3)]
-
abstract
subtractByKey
(other)¶ returns a new table containing elements only in this table but not in the other table.
- Parameters
other (CTableABC) – Another table to be subtractbykey with.
- Returns
A new table
- Return type
Examples
>>> from fate_arch.session import computing_session >>> a = computing_session.parallelize(range(10), include_key=False, partition=2) >>> b = computing_session.parallelize(range(5), include_key=False, partition=2) >>> c = a.subtractByKey(b) >>> list(c.collect()) [(5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]
-
abstract property
-
class
CSessionABC
¶ computing session to load/create/clean tables
Methods:
cleanup
(name, namespace)delete table(s)
load
(address, partitions, schema, **kwargs)load a table from given address
parallelize
(data, partition, include_key, …)create table from iterable data
Attributes:
get computing session id
-
abstract
load
(address: fate_arch.abc._address.AddressABC, partitions, schema: dict, **kwargs) → Union[fate_arch.abc._path.PathABC, fate_arch.abc._computing.CTableABC]¶ load a table from given address
- Parameters
address (AddressABC) – address to load table from
partitions (int) – number of partitions of loaded table
schema (dict) – schema associate with this table
- Returns
a table in memory
- Return type
-
abstract
parallelize
(data: collections.abc.Iterable, partition: int, include_key: bool, **kwargs) → fate_arch.abc._computing.CTableABC¶ create table from iterable data
- Parameters
data (Iterable) – data to create table from
partition (int) – number of partitions of created table
include_key (bool) –
True
for create table directly from data,False
for create table with generated keys start from 0
- Returns
a table create from data
- Return type
-
abstract
cleanup
(name, namespace)¶ delete table(s)
- Parameters
name (str) – table name or wildcard character
namespace (str) – namespace
-
abstract property
session_id
¶ get computing session id
- Returns
computing session id
- Return type
str
-
abstract