跳转至

Computing API

Most of the time, the federatedml's user does not need to know how to initialize a computing session because fate flow has already cover this for you. Unless, the user is writing unittest, and CTable related functions are involved. Initialize a computing session:

from fate_arch.session import computing_session
# initialize
computing_session.init(session_id="a great session")
# create a table from iterable data
table = computing_session.parallelize(range(100), include_key=False, partition=2)

computing session

computing_session

Bases: object

Functions

init(session_id, options=None) staticmethod
Source code in fate_arch/session/_session.py
513
514
515
@staticmethod
def init(session_id, options=None):
    Session(options=options).as_global().init_computing(session_id)
parallelize(data, partition, include_key, **kwargs) staticmethod
Source code in fate_arch/session/_session.py
517
518
519
@staticmethod
def parallelize(data: typing.Iterable, partition: int, include_key: bool, **kwargs) -> CTableABC:
    return get_computing_session().parallelize(data, partition=partition, include_key=include_key, **kwargs)
stop() staticmethod
Source code in fate_arch/session/_session.py
521
522
523
@staticmethod
def stop():
    return get_computing_session().stop()

computing table

After creating a table using computing session, many distributed computing api available

CTableABC

a table of pair-like data supports distributed processing

Attributes

engine abstractmethod property

get the engine name of table

Returns:

Type Description
int

number of partitions

partitions abstractmethod property

get the partitions of table

Returns:

Type Description
int

number of partitions

schema writable property

Functions

copy() abstractmethod
Source code in fate_arch/abc/_computing.py
64
65
66
@abc.abstractmethod
def copy(self):
    ...
save(address, partitions, schema, **kwargs) abstractmethod

save table

Parameters:

Name Type Description Default
address AddressABC

address to save table to

required
partitions int

number of partitions to save as

required
schema dict

table schema

required
Source code in fate_arch/abc/_computing.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@abc.abstractmethod
def save(self, address: AddressABC, partitions: int, schema: dict, **kwargs):
    """
    save table

    Parameters
    ----------
    address: AddressABC
       address to save table to
    partitions: int
       number of partitions to save as
    schema: dict
       table schema
    """
    ...
collect(**kwargs) abstractmethod

collect data from table

Returns:

Type Description
generator

generator of data

Notes

no order guarantee

Source code in fate_arch/abc/_computing.py
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@abc.abstractmethod
def collect(self, **kwargs) -> typing.Generator:
    """
    collect data from table

    Returns
    -------
    generator
       generator of data

    Notes
    ------
    no order guarantee
    """
    ...
take(n=1, **kwargs) abstractmethod

take n data from table

Parameters:

Name Type Description Default
n

number of data to take

1

Returns:

Type Description
list

a list of n data

Notes

no order guarantee

Source code in fate_arch/abc/_computing.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@abc.abstractmethod
def take(self, n=1, **kwargs):
    """
    take ``n`` data from table

    Parameters
    ----------
    n: int
      number of data to take

    Returns
    -------
    list
       a list of ``n`` data

    Notes
    ------
    no order guarantee
    """
    ...
first(**kwargs) abstractmethod

take one data from table

Returns:

Type Description
object

a data from table

Notes

no order guarantee

Source code in fate_arch/abc/_computing.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
@abc.abstractmethod
def first(self, **kwargs):
    """
    take one data from table

    Returns
    -------
    object
      a data from table


    Notes
    -------
    no order guarantee
    """
    ...
count() abstractmethod

number of data in table

Returns:

Type Description
int

number of data

Source code in fate_arch/abc/_computing.py
138
139
140
141
142
143
144
145
146
147
148
@abc.abstractmethod
def count(self) -> int:
    """
    number of data in table

    Returns
    -------
    int
       number of data
    """
    ...
map(func) abstractmethod

apply func to each data

Parameters:

Name Type Description Default
func

function map (k1, v1) to (k2, v2)

required

Returns:

Type Description
CTableABC

A new table

Examples:

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([('k1', 1), ('k2', 2), ('k3', 3)], include_key=True, partition=2)
>>> b = a.map(lambda k, v: (k, v**2))
>>> list(b.collect())
[("k1", 1), ("k2", 4), ("k3", 9)]
Source code in fate_arch/abc/_computing.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@abc.abstractmethod
def map(self, func) -> 'CTableABC':
    """
    apply `func` to each data

    Parameters
    ----------
    func: ``typing.Callable[[object, object], typing.Tuple[object, object]]``
       function map (k1, v1) to (k2, v2)

    Returns
    -------
    CTableABC
       A new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([('k1', 1), ('k2', 2), ('k3', 3)], include_key=True, partition=2)
    >>> b = a.map(lambda k, v: (k, v**2))
    >>> list(b.collect())
    [("k1", 1), ("k2", 4), ("k3", 9)]
    """
    ...
mapValues(func) abstractmethod

apply func to each value of data

Parameters:

Name Type Description Default
func

map v1 to v2

required

Returns:

Type Description
CTableABC

A new table

Examples:

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([('a', ['apple', 'banana', 'lemon']), ('b', ['grapes'])], include_key=True, partition=2)
>>> b = a.mapValues(lambda x: len(x))
>>> list(b.collect())
[('a', 3), ('b', 1)]
Source code in fate_arch/abc/_computing.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
@abc.abstractmethod
def mapValues(self, func):
    """
    apply `func` to each value of data

    Parameters
    ----------
    func: ``typing.Callable[[object], object]``
       map v1 to v2

    Returns
    -------
    CTableABC
       A new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([('a', ['apple', 'banana', 'lemon']), ('b', ['grapes'])], include_key=True, partition=2)
    >>> b = a.mapValues(lambda x: len(x))
    >>> list(b.collect())
    [('a', 3), ('b', 1)]
    """
    ...
mapPartitions(func, use_previous_behavior=True, preserves_partitioning=False) abstractmethod

apply func to each partition of table

Parameters:

Name Type Description Default
func

accept an iterator of pair, return a list of pair

required
use_previous_behavior

this parameter is provided for compatible reason, if set True, call this func will call applyPartitions instead

True
preserves_partitioning

flag indicate whether the func will preserve partition

False

Returns:

Type Description
CTableABC

a new table

Examples:

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3, 4, 5], include_key=False, partition=2)
>>> def f(iterator):
...     s = 0
...     for k, v in iterator:
...             s += v
...         return [(s, s)]
...
>>> b = a.mapPartitions(f)
>>> list(b.collect())
[(6, 6), (9, 9)]
Source code in fate_arch/abc/_computing.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
@abc.abstractmethod
def mapPartitions(self, func, use_previous_behavior=True, preserves_partitioning=False):
    """
    apply ``func`` to each partition of table

    Parameters
    ----------
    func: ``typing.Callable[[iter], list]``
       accept an iterator of pair, return a list of pair
    use_previous_behavior: bool
       this parameter is provided for compatible reason, if set True, call this func will call ``applyPartitions`` instead
    preserves_partitioning: bool
       flag indicate whether the `func` will preserve partition

    Returns
    -------
    CTableABC
       a new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([1, 2, 3, 4, 5], include_key=False, partition=2)
    >>> def f(iterator):
    ...     s = 0
    ... 	for k, v in iterator:
    ... 		s += v
    ...	    return [(s, s)]
    ...
    >>> b = a.mapPartitions(f)
    >>> list(b.collect())
    [(6, 6), (9, 9)]
    """
    ...
mapReducePartitions(mapper, reducer, **kwargs) abstractmethod

apply mapper to each partition of table and then perform reduce by key operation with reducer

Parameters:

Name Type Description Default
mapper

accept an iterator of pair, return a list of pair

required
reducer

reduce v1, v2 to v3

required

Returns:

Type Description
CTableABC

a new table

Examples:

>>> from fate_arch.session import computing_session
>>> table = computing_session.parallelize([(1, 2), (2, 3), (3, 4), (4, 5)], include_key=False, partition=2)
>>> def _mapper(it):
...     r = []
...     for k, v in it:
...         r.append((k % 3, v**2))
...         r.append((k % 2, v ** 3))
...     return r
>>> def _reducer(a, b):
...     return a + b
>>> output = table.mapReducePartitions(_mapper, _reducer)
>>> collected = dict(output.collect())
>>> assert collected[0] == 3 ** 3 + 5 ** 3 + 4 ** 2
>>> assert collected[1] == 2 ** 3 + 4 ** 3 + 2 ** 2 + 5 ** 2
>>> assert collected[2] == 3 ** 2
Source code in fate_arch/abc/_computing.py
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
@abc.abstractmethod
def mapReducePartitions(self, mapper, reducer, **kwargs):
    """
    apply ``mapper`` to each partition of table and then perform reduce by key operation with `reducer`

    Parameters
    ----------
    mapper: ``typing.Callable[[iter], list]``
       accept an iterator of pair, return a list of pair
    reducer: ``typing.Callable[[object, object], object]``
       reduce v1, v2 to v3

    Returns
    -------
    CTableABC
       a new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> table = computing_session.parallelize([(1, 2), (2, 3), (3, 4), (4, 5)], include_key=False, partition=2)
    >>> def _mapper(it):
    ...     r = []
    ...     for k, v in it:
    ...         r.append((k % 3, v**2))
    ...         r.append((k % 2, v ** 3))
    ...     return r
    >>> def _reducer(a, b):
    ...     return a + b
    >>> output = table.mapReducePartitions(_mapper, _reducer)
    >>> collected = dict(output.collect())
    >>> assert collected[0] == 3 ** 3 + 5 ** 3 + 4 ** 2
    >>> assert collected[1] == 2 ** 3 + 4 ** 3 + 2 ** 2 + 5 ** 2
    >>> assert collected[2] == 3 ** 2
    """

    ...
applyPartitions(func)

apply func to each partitions as a single object

Parameters:

Name Type Description Default
func

accept a iterator, return a object

required

Returns:

Type Description
CTableABC

a new table, with each partition contains a single key-value pair

Examples:

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3], partition=3, include_key=False)
>>> def f(it):
...    r = []
...    for k, v in it:
...        r.append(v, v**2, v**3)
...    return r
>>> output = a.applyPartitions(f)
>>> assert (2, 2**2, 2**3) in [v[0] for _, v in output.collect()]
Source code in fate_arch/abc/_computing.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def applyPartitions(self, func):
    """
    apply ``func`` to each partitions as a single object

    Parameters
    ----------
    func: ``typing.Callable[[iter], object]``
       accept a iterator, return a object

    Returns
    -------
    CTableABC
       a new table, with each partition contains a single key-value pair

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([1, 2, 3], partition=3, include_key=False)
    >>> def f(it):
    ...    r = []
    ...    for k, v in it:
    ...        r.append(v, v**2, v**3)
    ...    return r
    >>> output = a.applyPartitions(f)
    >>> assert (2, 2**2, 2**3) in [v[0] for _, v in output.collect()]
    """
    ...
mapPartitionsWithIndex(func, preserves_partitioning=False) abstractmethod
Source code in fate_arch/abc/_computing.py
301
302
303
@abc.abstractmethod
def mapPartitionsWithIndex(self, func, preserves_partitioning=False):
    ...
flatMap(func) abstractmethod

apply a flat func to each data of table

Parameters:

Name Type Description Default
func

a flat function accept two parameters return a list of pair

required

Returns:

Type Description
CTableABC

a new table

Examples:

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([(1, 1), (2, 2)], include_key=True, partition=2)
>>> b = a.flatMap(lambda x, y: [(x, y), (x + 10, y ** 2)])
>>> c = list(b.collect())
>>> assert len(c) = 4
>>> assert ((1, 1) in c) and ((2, 2) in c) and ((11, 1) in c) and ((12, 4) in c)
Source code in fate_arch/abc/_computing.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
@abc.abstractmethod
def flatMap(self, func):
    """
    apply a flat ``func`` to each data of table

    Parameters
    ----------
    func: ``typing.Callable[[object, object], typing.List[object, object]]``
       a flat function accept two parameters return a list of pair

    Returns
    -------
    CTableABC
       a new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([(1, 1), (2, 2)], include_key=True, partition=2)
    >>> b = a.flatMap(lambda x, y: [(x, y), (x + 10, y ** 2)])
    >>> c = list(b.collect())
    >>> assert len(c) = 4
    >>> assert ((1, 1) in c) and ((2, 2) in c) and ((11, 1) in c) and ((12, 4) in c)
    """
    ...
reduce(func) abstractmethod

reduces all value in pair of table by a binary function func

Parameters:

Name Type Description Default
func

binary function reduce two value into one

required

Returns:

Type Description
object

a single object

Examples:

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize(range(100), include_key=False, partition=4)
>>> assert a.reduce(lambda x, y: x + y) == sum(range(100))
Notes

func should be associative

Source code in fate_arch/abc/_computing.py
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
@abc.abstractmethod
def reduce(self, func):
    """
    reduces all value in pair of table by a binary function `func`

    Parameters
    ----------
    func: typing.Callable[[object, object], object]
       binary function reduce two value into one

    Returns
    -------
    object
       a single object



    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize(range(100), include_key=False, partition=4)
    >>> assert a.reduce(lambda x, y: x + y) == sum(range(100))

    Notes
    ------
    `func` should be associative
    """
    ...
glom() abstractmethod

coalesces all data within partition into a list

Returns:

Type Description
list

list containing all coalesced partition and its elements. First element of each tuple is chosen from key of last element of each partition.

Examples:

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize(range(5), include_key=False, partition=3).glom().collect()
>>> list(a)
[(2, [(2, 2)]), (3, [(0, 0), (3, 3)]), (4, [(1, 1), (4, 4)])]
Source code in fate_arch/abc/_computing.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
@abc.abstractmethod
def glom(self):
    """
    coalesces all data within partition into a list

    Returns
    -------
    list
       list containing all coalesced partition and its elements.
       First element of each tuple is chosen from key of last element of each partition.

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize(range(5), include_key=False, partition=3).glom().collect()
    >>> list(a)
    [(2, [(2, 2)]), (3, [(0, 0), (3, 3)]), (4, [(1, 1), (4, 4)])]
    """
    ...
sample(*, fraction=None, num=None, seed=None) abstractmethod

return a sampled subset of this Table.

Parameters:

Name Type Description Default
fraction typing.Optional[float]

Expected size of the sample as a fraction of this table's size without replacement: probability that each element is chosen. Fraction must be [0, 1] with replacement: expected number of times each element is chosen.

None
num typing.Optional[int]

Exact number of the sample from this table's size

None
seed

Seed of the random number generator. Use current timestamp when None is passed.

None

Returns:

Type Description
CTableABC

a new table

Examples:

>>> from fate_arch.session import computing_session
>>> x = computing_session.parallelize(range(100), include_key=False, partition=4)
>>> 6 <= x.sample(fraction=0.1, seed=81).count() <= 14
True
Notes

use one of fraction and num, not both

Source code in fate_arch/abc/_computing.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
@abc.abstractmethod
def sample(self, *, fraction: typing.Optional[float] = None, num: typing.Optional[int] = None, seed=None):
    """
    return a sampled subset of this Table.
    Parameters
    ----------
    fraction: float
      Expected size of the sample as a fraction of this table's size
      without replacement: probability that each element is chosen.
      Fraction must be [0, 1] with replacement: expected number of times each element is chosen.
    num: int
      Exact number of the sample from this table's size
    seed: int
      Seed of the random number generator. Use current timestamp when `None` is passed.

    Returns
    -------
    CTableABC
       a new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> x = computing_session.parallelize(range(100), include_key=False, partition=4)
    >>> 6 <= x.sample(fraction=0.1, seed=81).count() <= 14
    True

    Notes
    -------
    use one of ``fraction`` and ``num``, not both

    """
    ...
filter(func) abstractmethod

returns a new table containing only those keys which satisfy a predicate passed in via func.

Parameters:

Name Type Description Default
func

Predicate function returning a boolean.

required

Returns:

Type Description
CTableABC

A new table containing results.

Examples:

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([0, 1, 2], include_key=False, partition=2)
>>> b = a.filter(lambda k, v : k % 2 == 0)
>>> list(b.collect())
[(0, 0), (2, 2)]
>>> c = a.filter(lambda k, v : v % 2 != 0)
>>> list(c.collect())
[(1, 1)]
Source code in fate_arch/abc/_computing.py
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
@abc.abstractmethod
def filter(self, func):
    """
    returns a new table containing only those keys which satisfy a predicate passed in via ``func``.

    Parameters
    ----------
    func: typing.Callable[[object, object], bool]
       Predicate function returning a boolean.

    Returns
    -------
    CTableABC
       A new table containing results.

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([0, 1, 2], include_key=False, partition=2)
    >>> b = a.filter(lambda k, v : k % 2 == 0)
    >>> list(b.collect())
    [(0, 0), (2, 2)]
    >>> c = a.filter(lambda k, v : v % 2 != 0)
    >>> list(c.collect())
    [(1, 1)]
    """
    ...
join(other, func) abstractmethod

returns intersection of this table and the other table.

function func will be applied to values of keys that exist in both table.

Parameters:

Name Type Description Default
other

another table to be operated with.

required
func

the function applying to values whose key exists in both tables. default using left table's value.

required

Returns:

Type Description
CTableABC

a new table

Examples:

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2)        # [(0, 1), (1, 2), (2, 3)]
>>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
>>> c = a.join(b, lambda v1, v2 : v1 + v2)
>>> list(c.collect())
[(1, 3), (2, 5)]
Source code in fate_arch/abc/_computing.py
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
@abc.abstractmethod
def join(self, other, func):
    """
    returns intersection of this table and the other table.

    function ``func`` will be applied to values of keys that exist in both table.

    Parameters
    ----------
    other: CTableABC
      another table to be operated with.
    func: ``typing.Callable[[object, object], object]``
      the function applying to values whose key exists in both tables.
      default using left table's value.

    Returns
    -------
    CTableABC
      a new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2)	# [(0, 1), (1, 2), (2, 3)]
    >>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
    >>> c = a.join(b, lambda v1, v2 : v1 + v2)
    >>> list(c.collect())
    [(1, 3), (2, 5)]
    """
    ...
union(other, func=lambda v1, v2: v1) abstractmethod

returns union of this table and the other table.

function func will be applied to values of keys that exist in both table.

Parameters:

Name Type Description Default
other

another table to be operated with.

required
func

The function applying to values whose key exists in both tables. default using left table's value.

lambda v1, v2: v1

Returns:

Type Description
CTableABC

a new table

Examples:

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2)        # [(0, 1), (1, 2), (2, 3)]
>>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
>>> c = a.union(b, lambda v1, v2 : v1 + v2)
>>> list(c.collect())
[(0, 1), (1, 3), (2, 5), (3, 3)]
Source code in fate_arch/abc/_computing.py
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
@abc.abstractmethod
def union(self, other, func=lambda v1, v2: v1):
    """
    returns union of this table and the other table.

    function ``func`` will be applied to values of keys that exist in both table.

    Parameters
    ----------
    other: CTableABC
      another table to be operated with.
    func: ``typing.Callable[[object, object], object]``
      The function applying to values whose key exists in both tables.
      default using left table's value.

    Returns
    -------
    CTableABC
      a new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize([1, 2, 3], include_key=False, partition=2)	# [(0, 1), (1, 2), (2, 3)]
    >>> b = computing_session.parallelize([(1, 1), (2, 2), (3, 3)], include_key=True, partition=2)
    >>> c = a.union(b, lambda v1, v2 : v1 + v2)
    >>> list(c.collect())
    [(0, 1), (1, 3), (2, 5), (3, 3)]
    """
    ...
subtractByKey(other) abstractmethod

returns a new table containing elements only in this table but not in the other table.

Parameters:

Name Type Description Default
other

Another table to be subtractbykey with.

required

Returns:

Type Description
CTableABC

A new table

Examples:

>>> from fate_arch.session import computing_session
>>> a = computing_session.parallelize(range(10), include_key=False, partition=2)
>>> b = computing_session.parallelize(range(5), include_key=False, partition=2)
>>> c = a.subtractByKey(b)
>>> list(c.collect())
[(5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]
Source code in fate_arch/abc/_computing.py
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
@abc.abstractmethod
def subtractByKey(self, other):
    """
    returns a new table containing elements only in this table but not in the other table.

    Parameters
    ----------
    other: CTableABC
      Another table to be subtractbykey with.

    Returns
    -------
    CTableABC
      A new table

    Examples
    --------
    >>> from fate_arch.session import computing_session
    >>> a = computing_session.parallelize(range(10), include_key=False, partition=2)
    >>> b = computing_session.parallelize(range(5), include_key=False, partition=2)
    >>> c = a.subtractByKey(b)
    >>> list(c.collect())
    [(5, 5), (6, 6), (7, 7), (8, 8), (9, 9)]
    """
    ...

最后更新: 2021-11-15