Computing API¶
Most of the time, the federatedml's user does not need to know how to initialize a computing session because fate flow has already cover this for you. Unless, the user is writing unittest, and CTable related functions are involved. Initialize a computing session:
from fate_arch.session import computing_session
# initialize
computing_session.init(session_id="a great session")
# create a table from iterable data
table = computing_session.parallelize(range(100), include_key=False, partition=2)
computing session¶
computing_session
¶
Source code in fate_arch/session/_session.py
class computing_session(object):
@staticmethod
def init(session_id, options=None):
Session(options=options).as_global().init_computing(session_id)
@staticmethod
def parallelize(data: typing.Iterable, partition: int, include_key: bool, **kwargs) -> CTableABC:
return get_computing_session().parallelize(data, partition=partition, include_key=include_key, **kwargs)
@staticmethod
def stop():
return get_computing_session().stop()
init(session_id, options=None)
staticmethod
¶
Source code in fate_arch/session/_session.py
@staticmethod
def init(session_id, options=None):
Session(options=options).as_global().init_computing(session_id)
parallelize(data, partition, include_key, **kwargs)
staticmethod
¶
Source code in fate_arch/session/_session.py
@staticmethod
def parallelize(data: typing.Iterable, partition: int, include_key: bool, **kwargs) -> CTableABC:
return get_computing_session().parallelize(data, partition=partition, include_key=include_key, **kwargs)
stop()
staticmethod
¶
Source code in fate_arch/session/_session.py
@staticmethod
def stop():
return get_computing_session().stop()
computing table¶
After creating a table using computing session, many distributed computing api available
CTableABC
¶
a table of pair-like data supports distributed processing
Source code in fate_arch/abc/_computing.py
class CTableABC(metaclass=ABCMeta):
"""
a table of pair-like data supports distributed processing
"""
@property
@abc.abstractmethod
def engine(self):
"""
get the engine name of table
Returns
-------
int
number of partitions
"""
...
@property
@abc.abstractmethod
def partitions(self):
"""
get the partitions of table
Returns
-------
int
number of partitions
"""
...
@abc.abstractmethod
def copy(self):
...
@abc.abstractmethod
def save(self, address: AddressABC, partitions: int, schema: dict, **kwargs):
"""
save table
Parameters
----------
address: AddressABC
address to save table to
partitions: int
number of partitions to save as
schema: dict
table schema
"""
...
@abc.abstractmethod
def collect(self, **kwargs) -> typing.Generator:
"""
collect data from table
Returns
-------
generator
generator of data
Notes
------
no order guarantee
"""
...
@abc.abstractmethod
def take(self, n=1, **kwargs):
"""
take ``n`` data from table
Parameters
----------
n: int
number of data to take
Returns
-------
list
a list of ``n`` data
Notes
------
no order guarantee
"""
...
@abc.abstractmethod
def first(self, **kwargs):
"""
take one data from table
Returns
-------
object
a data from table
Notes
-------
no order guarantee
"""
...
@abc.abstractmethod
def count(self) -> int:
"""
number of data in table
Returns
-------
int
number of data
"""
...
@abc.abstractmethod
def map(self, func) -> 'CTableABC':
"""
apply `func` to each data
Parameters
----------
func: ``typing.Callable[[object, object], typing.Tuple[object, object]]``
function map (k1, v1) to (k2, v2)
Returns
-------
CTableABC
A new table
Examples
--------
>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([('k1', 1), ('k2', 2), ('k3', 3)], include_key=True, partition=2)
>>> b = a.map(lambda k, v: (k, v**2))
>>> list(b.collect())
[("k1", 1), ("k2", 4), ("k3", 9)]
"""
...
@abc.abstractmethod
def mapValues(self, func):
"""
apply `func` to each value of data
Parameters
----------
func: ``typing.Callable[[object], object]``
map v1 to v2
Returns
-------
CTableABC
A new table
Examples
--------
>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([('a', ['apple', 'banana', 'lemon']), ('b', ['grapes'])], include_key=True, partition=2)
>>> b = a.mapValues(lambda x: len(x))
>>> list(b.collect())
[('a', 3), ('b', 1)]
"""
...
@abc.abstractmethod
def mapPartitions(self, func, use_previous_behavior=True, preserves_partitioning=False):
"""
apply ``func`` to each partition of table
Parameters
----------
func: ``typing.Callable[[iter], list]``
accept an iterator of pair, return a list of pair
use_previous_behavior: bool
this parameter is provided for compatible reason, if set True, call this func will call ``applyPartitions`` instead
preserves_partitioning: bool
flag indicate whether the `func` will preserve partition
Returns
-------
CTableABC
a new table
Examples
--------
>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3, 4, 5], include_key=False, partition=2)
>>> def f(iterator):
... s = 0
... for k, v in iterator:
... s += v
... return [(s, s)]
...
>>> b = a.mapPartitions(f)
>>> list(b.collect())
[(6, 6), (9, 9)]
"""
...
@abc.abstractmethod
def mapReducePartitions(self, mapper, reducer, **kwargs):
"""
apply ``mapper`` to each partition of table and then perform reduce by key operation with `reducer`
Parameters
----------
mapper: ``typing.Callable[[iter], list]``
accept an iterator of pair, return a list of pair
reducer: ``typing.Callable[[object, object], object]``
reduce v1, v2 to v3
Returns
-------
CTableABC
a new table
Examples
--------
>>> from fate_arch.session import computing_session
>>> table = computing_session.parallelize([(1, 2), (2, 3), (3, 4), (4, 5)], include_key=False, partition=2)
>>> def _mapper(it):
... r = []
... for k, v in it:
... r.append((k % 3, v**2))
... r.append((k % 2, v ** 3))
... return r
>>> def _reducer(a, b):
... return a + b
>>> output = table.mapReducePartitions(_mapper, _reducer)
>>> collected = dict(output.collect())
>>> assert collected[0] == 3 ** 3 + 5 ** 3 + 4 ** 2
>>> assert collected[1] == 2 ** 3 + 4 ** 3 + 2 ** 2 + 5 ** 2
>>> assert collected[2] == 3 ** 2
"""
...
def applyPartitions(self, func):
"""
apply ``func`` to each partitions as a single object
Parameters
----------
func: ``typing.Callable[[iter], object]``
accept a iterator, return a object
Returns
-------
CTableABC
a new table, with each partition contains a single key-value pair
Examples
--------
>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3], partition=3, include_key=False)
>>> def f(it):
... r = []
... for k, v in it:
... r.append(v, v**2, v**3)
... return r
>>> output = a.applyPartitions(f)
>>> assert (2, 2**2, 2**3) in [v[0] for _, v in output.collect()]
"""
...
@abc.abstractmethod
def mapPartitionsWithIndex(self, func, preserves_partitioning=False):
...
@abc.abstractmethod
def flatMap(self, func):
"""
apply a flat ``func`` to each data of table
Parameters
----------
func: ``typing.Callable[[object, object], typing.List[object, object]]``
a flat function accept two parameters return a list of pair
Returns
-------
CTableABC
a new table
Examples
--------
>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([(1, 1), (2, 2)], include_key=True, partition=2)
>>> b = a.flatMap(lambda x, y: [(x, y), (x + 10, y ** 2)])
>>> c = list(b.collect())
>>> assert len(c) = 4
>>> assert ((1, 1) in c) and ((2, 2) in c) and ((11, 1) in c) and ((12, 4) in c)
"""
...
@abc.abstractmethod
def reduce(self, func):
"""
reduces all value in pair of table by a binary function `func`
Parameters
----------
func: typing.Callable[[object, object], object]
binary function reduce two value into one
Returns
-------
object
a single object
Examples
--------
>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize(range(100), include_key=False, partition=4)
>>> assert a.reduce(lambda x, y: x + y) == sum(range(100))
Notes
------
`func` should be associative
"""
...
@abc.abstractmethod
def glom(self):
"""
coalesces all data within partition into a list
Returns
-------
list
list containing all coalesced partition and its elements.
First element of each tuple is chosen from key of last element of each partition.
Examples
--------
>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize(range(5), include_key=False, partition=3).glom().collect()
>>> list(a)
[(2, [(2, 2)]), (3, [(0, 0), (3, 3)]), (4, [(1, 1), (4, 4)])]
"""
...
@abc.abstractmethod
def sample(self, *, fraction: typing.Optional[float] = None, num: typing.Optional[int] = None, seed=None):
"""
return a sampled subset of this Table.
Parameters
----------
fraction: float
Expected size of the sample as a fraction of this table's size
without replacement: probability that each element is chosen.
Fraction must be [0, 1] with replacement: expected number of times each element is chosen.
num: int
Exact number of the sample from this table's size
seed: int
Seed of the random number generator. Use current timestamp when `None` is passed.
Returns
-------
CTableABC
a new table
Examples
--------
>>> from fate_arch.session import computing_session
>>> x = computing_session.parallelize(range(100), include_key=False, partition=4)
>>> 6 <= x.sample(fraction=0.1, seed=81).count() <= 14
True
Notes
-------
use one of ``fraction`` and ``num``, not both
"""
...
@abc.abstractmethod
def filter(self, func):
"""
returns a new table containing only those keys which satisfy a predicate passed in via ``func``.
Parameters
----------
func: typing.Callable[[object, object], bool]
Predicate function returning a boolean.
Returns
-------
CTableABC
A new table containing results.
Examples
--------
>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([0, 1, 2], include_key=False, partition=2)
>>> b = a.filter(lambda k, v : k % 2 == 0)
>>> list(b.collect())
[(0, 0), (2, 2)]
>>> c = a.filter(lambda k, v : v % 2 != 0)
>>> list(c.collect())
[(1, 1)]
"""
...
@abc.abstractmethod
def join(self, other, func):
"""
returns intersection of this table and the other table.
function ``func`` will be applied to values of keys that exist in both table.
Parameters
----------
other: CTableABC
another table to be operated with.
func: ``typing.Callable[[object, object], object]``
the function applying to values whose key exists in both tables.
default using left table's value.
Returns
-------
CTableABC
a new table
Examples
--------
>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2) # [(0, 1), (1, 2), (2, 3)]
>>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
>>> c = a.join(b, lambda v1, v2 : v1 + v2)
>>> list(c.collect())
[(1, 3), (2, 5)]
"""
...
@abc.abstractmethod
def union(self, other, func=lambda v1, v2: v1):
"""
returns union of this table and the other table.
function ``func`` will be applied to values of keys that exist in both table.
Parameters
----------
other: CTableABC
another table to be operated with.
func: ``typing.Callable[[object, object], object]``
The function applying to values whose key exists in both tables.
default using left table's value.
Returns
-------
CTableABC
a new table
Examples
--------
>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2) # [(0, 1), (1, 2), (2, 3)]
>>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
>>> c = a.union(b, lambda v1, v2 : v1 + v2)
>>> list(c.collect())
[(0, 1), (1, 3), (2, 5), (3, 3)]
"""
...
@abc.abstractmethod
def subtractByKey(self, other):
"""
returns a new table containing elements only in this table but not in the other table.
Parameters
----------
other: CTableABC
Another table to be subtractbykey with.
Returns
-------
CTableABC
A new table
Examples
--------
>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize(range(10), include_key=False, partition=2)
>>> b = computing_session.parallelize(range(5), include_key=False, partition=2)
>>> c = a.subtractByKey(b)
>>> list(c.collect())
[(5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]
"""
...
@property
def schema(self):
if not hasattr(self, "_schema"):
setattr(self, "_schema", {})
return getattr(self, "_schema")
@schema.setter
def schema(self, value):
setattr(self, "_schema", value)
Attributes¶
schema
property
writable
¶
Methods¶
copy(self)
¶
Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def copy(self):
...
save(self, address, partitions, schema, **kwargs)
¶
save table
Parameters¶
address: AddressABC address to save table to partitions: int number of partitions to save as schema: dict table schema
Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def save(self, address: AddressABC, partitions: int, schema: dict, **kwargs):
"""
save table
Parameters
----------
address: AddressABC
address to save table to
partitions: int
number of partitions to save as
schema: dict
table schema
"""
...
collect(self, **kwargs)
¶
collect data from table
Returns¶
generator generator of data
Notes¶
no order guarantee
Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def collect(self, **kwargs) -> typing.Generator:
"""
collect data from table
Returns
-------
generator
generator of data
Notes
------
no order guarantee
"""
...
take(self, n=1, **kwargs)
¶
take n
data from table
Parameters¶
n: int number of data to take
Returns¶
list
a list of n
data
Notes¶
no order guarantee
Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def take(self, n=1, **kwargs):
"""
take ``n`` data from table
Parameters
----------
n: int
number of data to take
Returns
-------
list
a list of ``n`` data
Notes
------
no order guarantee
"""
...
first(self, **kwargs)
¶
count(self)
¶
number of data in table
Returns¶
int number of data
Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def count(self) -> int:
"""
number of data in table
Returns
-------
int
number of data
"""
...
map(self, func)
¶
apply func
to each data
Parameters¶
func: typing.Callable[[object, object], typing.Tuple[object, object]]
function map (k1, v1) to (k2, v2)
Returns¶
CTableABC A new table
Examples¶
from fate_arch.session import computing_session a = computing_session.parallelize([('k1', 1), ('k2', 2), ('k3', 3)], include_key=True, partition=2) b = a.map(lambda k, v: (k, v**2)) list(b.collect()) [("k1", 1), ("k2", 4), ("k3", 9)]
Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def map(self, func) -> 'CTableABC':
"""
apply `func` to each data
Parameters
----------
func: ``typing.Callable[[object, object], typing.Tuple[object, object]]``
function map (k1, v1) to (k2, v2)
Returns
-------
CTableABC
A new table
Examples
--------
>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([('k1', 1), ('k2', 2), ('k3', 3)], include_key=True, partition=2)
>>> b = a.map(lambda k, v: (k, v**2))
>>> list(b.collect())
[("k1", 1), ("k2", 4), ("k3", 9)]
"""
...
mapValues(self, func)
¶
apply func
to each value of data
Parameters¶
func: typing.Callable[[object], object]
map v1 to v2
Returns¶
CTableABC A new table
Examples¶
from fate_arch.session import computing_session a = computing_session.parallelize([('a', ['apple', 'banana', 'lemon']), ('b', ['grapes'])], include_key=True, partition=2) b = a.mapValues(lambda x: len(x)) list(b.collect()) [('a', 3), ('b', 1)]
Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def mapValues(self, func):
"""
apply `func` to each value of data
Parameters
----------
func: ``typing.Callable[[object], object]``
map v1 to v2
Returns
-------
CTableABC
A new table
Examples
--------
>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([('a', ['apple', 'banana', 'lemon']), ('b', ['grapes'])], include_key=True, partition=2)
>>> b = a.mapValues(lambda x: len(x))
>>> list(b.collect())
[('a', 3), ('b', 1)]
"""
...
mapPartitions(self, func, use_previous_behavior=True, preserves_partitioning=False)
¶
apply func
to each partition of table
Parameters¶
func: typing.Callable[[iter], list]
accept an iterator of pair, return a list of pair
use_previous_behavior: bool
this parameter is provided for compatible reason, if set True, call this func will call applyPartitions
instead
preserves_partitioning: bool
flag indicate whether the func
will preserve partition
Returns¶
CTableABC a new table
Examples¶
from fate_arch.session import computing_session a = computing_session.parallelize([1, 2, 3, 4, 5], include_key=False, partition=2) def f(iterator): ... s = 0 ... for k, v in iterator: ... s += v ... return [(s, s)] ... b = a.mapPartitions(f) list(b.collect()) [(6, 6), (9, 9)]
Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def mapPartitions(self, func, use_previous_behavior=True, preserves_partitioning=False):
"""
apply ``func`` to each partition of table
Parameters
----------
func: ``typing.Callable[[iter], list]``
accept an iterator of pair, return a list of pair
use_previous_behavior: bool
this parameter is provided for compatible reason, if set True, call this func will call ``applyPartitions`` instead
preserves_partitioning: bool
flag indicate whether the `func` will preserve partition
Returns
-------
CTableABC
a new table
Examples
--------
>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3, 4, 5], include_key=False, partition=2)
>>> def f(iterator):
... s = 0
... for k, v in iterator:
... s += v
... return [(s, s)]
...
>>> b = a.mapPartitions(f)
>>> list(b.collect())
[(6, 6), (9, 9)]
"""
...
mapReducePartitions(self, mapper, reducer, **kwargs)
¶
apply mapper
to each partition of table and then perform reduce by key operation with reducer
Parameters¶
mapper: typing.Callable[[iter], list]
accept an iterator of pair, return a list of pair
reducer: typing.Callable[[object, object], object]
reduce v1, v2 to v3
Returns¶
CTableABC a new table
Examples¶
from fate_arch.session import computing_session table = computing_session.parallelize([(1, 2), (2, 3), (3, 4), (4, 5)], include_key=False, partition=2) def _mapper(it): ... r = [] ... for k, v in it: ... r.append((k % 3, v**2)) ... r.append((k % 2, v ** 3)) ... return r def _reducer(a, b): ... return a + b output = table.mapReducePartitions(_mapper, _reducer) collected = dict(output.collect()) assert collected[0] == 3 ** 3 + 5 ** 3 + 4 ** 2 assert collected[1] == 2 ** 3 + 4 ** 3 + 2 ** 2 + 5 ** 2 assert collected[2] == 3 ** 2
Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def mapReducePartitions(self, mapper, reducer, **kwargs):
"""
apply ``mapper`` to each partition of table and then perform reduce by key operation with `reducer`
Parameters
----------
mapper: ``typing.Callable[[iter], list]``
accept an iterator of pair, return a list of pair
reducer: ``typing.Callable[[object, object], object]``
reduce v1, v2 to v3
Returns
-------
CTableABC
a new table
Examples
--------
>>> from fate_arch.session import computing_session
>>> table = computing_session.parallelize([(1, 2), (2, 3), (3, 4), (4, 5)], include_key=False, partition=2)
>>> def _mapper(it):
... r = []
... for k, v in it:
... r.append((k % 3, v**2))
... r.append((k % 2, v ** 3))
... return r
>>> def _reducer(a, b):
... return a + b
>>> output = table.mapReducePartitions(_mapper, _reducer)
>>> collected = dict(output.collect())
>>> assert collected[0] == 3 ** 3 + 5 ** 3 + 4 ** 2
>>> assert collected[1] == 2 ** 3 + 4 ** 3 + 2 ** 2 + 5 ** 2
>>> assert collected[2] == 3 ** 2
"""
...
applyPartitions(self, func)
¶
apply func
to each partitions as a single object
Parameters¶
func: typing.Callable[[iter], object]
accept a iterator, return a object
Returns¶
CTableABC a new table, with each partition contains a single key-value pair
Examples¶
from fate_arch.session import computing_session a = computing_session.parallelize([1, 2, 3], partition=3, include_key=False) def f(it): ... r = [] ... for k, v in it: ... r.append(v, v**2, v**3) ... return r output = a.applyPartitions(f) assert (2, 2**2, 2**3) in [v[0] for _, v in output.collect()]
Source code in fate_arch/abc/_computing.py
def applyPartitions(self, func):
"""
apply ``func`` to each partitions as a single object
Parameters
----------
func: ``typing.Callable[[iter], object]``
accept a iterator, return a object
Returns
-------
CTableABC
a new table, with each partition contains a single key-value pair
Examples
--------
>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3], partition=3, include_key=False)
>>> def f(it):
... r = []
... for k, v in it:
... r.append(v, v**2, v**3)
... return r
>>> output = a.applyPartitions(f)
>>> assert (2, 2**2, 2**3) in [v[0] for _, v in output.collect()]
"""
...
mapPartitionsWithIndex(self, func, preserves_partitioning=False)
¶
Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def mapPartitionsWithIndex(self, func, preserves_partitioning=False):
...
flatMap(self, func)
¶
apply a flat func
to each data of table
Parameters¶
func: typing.Callable[[object, object], typing.List[object, object]]
a flat function accept two parameters return a list of pair
Returns¶
CTableABC a new table
Examples¶
from fate_arch.session import computing_session a = computing_session.parallelize([(1, 1), (2, 2)], include_key=True, partition=2) b = a.flatMap(lambda x, y: [(x, y), (x + 10, y ** 2)]) c = list(b.collect()) assert len© = 4 assert ((1, 1) in c) and ((2, 2) in c) and ((11, 1) in c) and ((12, 4) in c)
Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def flatMap(self, func):
"""
apply a flat ``func`` to each data of table
Parameters
----------
func: ``typing.Callable[[object, object], typing.List[object, object]]``
a flat function accept two parameters return a list of pair
Returns
-------
CTableABC
a new table
Examples
--------
>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([(1, 1), (2, 2)], include_key=True, partition=2)
>>> b = a.flatMap(lambda x, y: [(x, y), (x + 10, y ** 2)])
>>> c = list(b.collect())
>>> assert len(c) = 4
>>> assert ((1, 1) in c) and ((2, 2) in c) and ((11, 1) in c) and ((12, 4) in c)
"""
...
reduce(self, func)
¶
reduces all value in pair of table by a binary function func
Parameters¶
func: typing.Callable[[object, object], object] binary function reduce two value into one
Returns¶
object a single object
Examples¶
from fate_arch.session import computing_session a = computing_session.parallelize(range(100), include_key=False, partition=4) assert a.reduce(lambda x, y: x + y) == sum(range(100))
Notes¶
func
should be associative
Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def reduce(self, func):
"""
reduces all value in pair of table by a binary function `func`
Parameters
----------
func: typing.Callable[[object, object], object]
binary function reduce two value into one
Returns
-------
object
a single object
Examples
--------
>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize(range(100), include_key=False, partition=4)
>>> assert a.reduce(lambda x, y: x + y) == sum(range(100))
Notes
------
`func` should be associative
"""
...
glom(self)
¶
coalesces all data within partition into a list
Returns¶
list list containing all coalesced partition and its elements. First element of each tuple is chosen from key of last element of each partition.
Examples¶
from fate_arch.session import computing_session a = computing_session.parallelize(range(5), include_key=False, partition=3).glom().collect() list(a) [(2, [(2, 2)]), (3, [(0, 0), (3, 3)]), (4, [(1, 1), (4, 4)])]
Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def glom(self):
"""
coalesces all data within partition into a list
Returns
-------
list
list containing all coalesced partition and its elements.
First element of each tuple is chosen from key of last element of each partition.
Examples
--------
>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize(range(5), include_key=False, partition=3).glom().collect()
>>> list(a)
[(2, [(2, 2)]), (3, [(0, 0), (3, 3)]), (4, [(1, 1), (4, 4)])]
"""
...
sample(self, *, fraction=None, num=None, seed=None)
¶
return a sampled subset of this Table. Parameters
fraction: float
Expected size of the sample as a fraction of this table's size
without replacement: probability that each element is chosen.
Fraction must be [0, 1] with replacement: expected number of times each element is chosen.
num: int
Exact number of the sample from this table's size
seed: int
Seed of the random number generator. Use current timestamp when None
is passed.
Returns¶
CTableABC a new table
Examples¶
from fate_arch.session import computing_session x = computing_session.parallelize(range(100), include_key=False, partition=4) 6 <= x.sample(fraction=0.1, seed=81).count() <= 14 True
Notes¶
use one of fraction
and num
, not both
Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def sample(self, *, fraction: typing.Optional[float] = None, num: typing.Optional[int] = None, seed=None):
"""
return a sampled subset of this Table.
Parameters
----------
fraction: float
Expected size of the sample as a fraction of this table's size
without replacement: probability that each element is chosen.
Fraction must be [0, 1] with replacement: expected number of times each element is chosen.
num: int
Exact number of the sample from this table's size
seed: int
Seed of the random number generator. Use current timestamp when `None` is passed.
Returns
-------
CTableABC
a new table
Examples
--------
>>> from fate_arch.session import computing_session
>>> x = computing_session.parallelize(range(100), include_key=False, partition=4)
>>> 6 <= x.sample(fraction=0.1, seed=81).count() <= 14
True
Notes
-------
use one of ``fraction`` and ``num``, not both
"""
...
filter(self, func)
¶
returns a new table containing only those keys which satisfy a predicate passed in via func
.
Parameters¶
func: typing.Callable[[object, object], bool] Predicate function returning a boolean.
Returns¶
CTableABC A new table containing results.
Examples¶
from fate_arch.session import computing_session a = computing_session.parallelize([0, 1, 2], include_key=False, partition=2) b = a.filter(lambda k, v : k % 2 == 0) list(b.collect()) [(0, 0), (2, 2)] c = a.filter(lambda k, v : v % 2 != 0) list(c.collect()) [(1, 1)]
Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def filter(self, func):
"""
returns a new table containing only those keys which satisfy a predicate passed in via ``func``.
Parameters
----------
func: typing.Callable[[object, object], bool]
Predicate function returning a boolean.
Returns
-------
CTableABC
A new table containing results.
Examples
--------
>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([0, 1, 2], include_key=False, partition=2)
>>> b = a.filter(lambda k, v : k % 2 == 0)
>>> list(b.collect())
[(0, 0), (2, 2)]
>>> c = a.filter(lambda k, v : v % 2 != 0)
>>> list(c.collect())
[(1, 1)]
"""
...
join(self, other, func)
¶
returns intersection of this table and the other table.
function func
will be applied to values of keys that exist in both table.
Parameters¶
other: CTableABC
another table to be operated with.
func: typing.Callable[[object, object], object]
the function applying to values whose key exists in both tables.
default using left table's value.
Returns¶
CTableABC a new table
Examples¶
from fate_arch.session import computing_session a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2) # [(0, 1), (1, 2), (2, 3)] b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2) c = a.join(b, lambda v1, v2 : v1 + v2) list(c.collect()) [(1, 3), (2, 5)]
Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def join(self, other, func):
"""
returns intersection of this table and the other table.
function ``func`` will be applied to values of keys that exist in both table.
Parameters
----------
other: CTableABC
another table to be operated with.
func: ``typing.Callable[[object, object], object]``
the function applying to values whose key exists in both tables.
default using left table's value.
Returns
-------
CTableABC
a new table
Examples
--------
>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2) # [(0, 1), (1, 2), (2, 3)]
>>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
>>> c = a.join(b, lambda v1, v2 : v1 + v2)
>>> list(c.collect())
[(1, 3), (2, 5)]
"""
...
union(self, other, func=<function CTableABC.<lambda> at 0x7f945a142a70>)
¶
returns union of this table and the other table.
function func
will be applied to values of keys that exist in both table.
Parameters¶
other: CTableABC
another table to be operated with.
func: typing.Callable[[object, object], object]
The function applying to values whose key exists in both tables.
default using left table's value.
Returns¶
CTableABC a new table
Examples¶
from fate_arch.session import computing_session a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2) # [(0, 1), (1, 2), (2, 3)] b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2) c = a.union(b, lambda v1, v2 : v1 + v2) list(c.collect()) [(0, 1), (1, 3), (2, 5), (3, 3)]
Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def union(self, other, func=lambda v1, v2: v1):
"""
returns union of this table and the other table.
function ``func`` will be applied to values of keys that exist in both table.
Parameters
----------
other: CTableABC
another table to be operated with.
func: ``typing.Callable[[object, object], object]``
The function applying to values whose key exists in both tables.
default using left table's value.
Returns
-------
CTableABC
a new table
Examples
--------
>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2) # [(0, 1), (1, 2), (2, 3)]
>>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
>>> c = a.union(b, lambda v1, v2 : v1 + v2)
>>> list(c.collect())
[(0, 1), (1, 3), (2, 5), (3, 3)]
"""
...
subtractByKey(self, other)
¶
returns a new table containing elements only in this table but not in the other table.
Parameters¶
other: CTableABC Another table to be subtractbykey with.
Returns¶
CTableABC A new table
Examples¶
from fate_arch.session import computing_session a = computing_session.parallelize(range(10), include_key=False, partition=2) b = computing_session.parallelize(range(5), include_key=False, partition=2) c = a.subtractByKey(b) list(c.collect()) [(5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]
Source code in fate_arch/abc/_computing.py
@abc.abstractmethod
def subtractByKey(self, other):
"""
returns a new table containing elements only in this table but not in the other table.
Parameters
----------
other: CTableABC
Another table to be subtractbykey with.
Returns
-------
CTableABC
A new table
Examples
--------
>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize(range(10), include_key=False, partition=2)
>>> b = computing_session.parallelize(range(5), include_key=False, partition=2)
>>> c = a.subtractByKey(b)
>>> list(c.collect())
[(5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]
"""
...