跳转至

Session

Session API

Session is the context to use computing, storage and federation resouraces. At most situation, users should not concern how Session is create. FATE-Flow is responsed to create and init Session when Task launched.

For those who want to use computing, storage and federation api outside FATE-Flow Task, flowing is a short guide.

  1. init Session

    sess = Session()
    sess.as_global()
    
    # flowing is optional, call if needed
    
    # init computing
    sess.init_computing(...)
    
    # init federation
    sess.init_federation(...)
    
    # init federation
    sess.init_storage(...)
    
    2. calling specific api

    computing = sess.computing
    federation = sess.federation
    storage = sess.storage
    
  2. computing api has a shortcut

    from fate_arch.session import computing_session
    computing_session.init(...)
    computing_session.parallelize(...)
    

Detailed API

Session(session_id=None, options=None)

Bases: object

Source code in fate_arch/session/_session.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def __init__(self, session_id: str = None, options=None):
    if options is None:
        options = {}
    engines = engine_utils.get_engines()
    LOGGER.info(f"using engines: {engines}")
    computing_type = engines.get(EngineType.COMPUTING, None)
    if computing_type is None:
        raise RuntimeError(f"must set default engines on conf/service_conf.yaml")

    self._computing_type = engines.get(EngineType.COMPUTING, None)
    self._federation_type = engines.get(EngineType.FEDERATION, None)
    self._storage_engine = engines.get(EngineType.STORAGE, None)
    self._computing_session: typing.Optional[CSessionABC] = None
    self._federation_session: typing.Optional[FederationABC] = None
    self._storage_session: typing.Dict[StorageSessionABC] = {}
    self._parties_info: typing.Optional[PartiesInfo] = None
    self._all_party_info: typing.List[Party] = []
    self._session_id = str(uuid.uuid1()) if not session_id else session_id
    self._logger = LOGGER if options.get("logger", None) is None else options.get("logger", None)

    self._logger.info(f"create manager session {self._session_id}")

    # init meta db
    init_database_tables()

Attributes

__GLOBAL_SESSION = None class-attribute
session_id: str property
computing: CSessionABC property
federation: FederationABC property
parties property
is_computing_valid property
is_federation_valid property

Functions

get_global() classmethod
Source code in fate_arch/session/_session.py
38
39
40
@classmethod
def get_global(cls):
    return cls.__GLOBAL_SESSION
as_global()
Source code in fate_arch/session/_session.py
46
47
48
def as_global(self):
    self._as_global(self)
    return self
__enter__()
Source code in fate_arch/session/_session.py
85
86
def __enter__(self):
    return self._open()
__exit__(exc_type, exc_val, exc_tb)
Source code in fate_arch/session/_session.py
88
89
90
91
def __exit__(self, exc_type, exc_val, exc_tb):
    if exc_tb:
        self._logger.exception("", exc_info=(exc_type, exc_val, exc_tb))
    return self._close()
init_computing(computing_session_id=None, record=True, **kwargs)
Source code in fate_arch/session/_session.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def init_computing(self,
                   computing_session_id: str = None,
                   record: bool = True,
                   **kwargs):
    computing_session_id = f"{self._session_id}_computing_{uuid.uuid1()}" if not computing_session_id else computing_session_id
    if self.is_computing_valid:
        raise RuntimeError(f"computing session already valid")

    if record:
        self.save_record(engine_type=EngineType.COMPUTING,
                         engine_name=self._computing_type,
                         engine_session_id=computing_session_id)

    if self._computing_type == ComputingEngine.STANDALONE:
        from fate_arch.computing.standalone import CSession

        options = kwargs.get("options", {})
        self._computing_session = CSession(session_id=computing_session_id, options=options)
        self._computing_type = ComputingEngine.STANDALONE
        return self

    if self._computing_type == ComputingEngine.EGGROLL:
        from fate_arch.computing.eggroll import CSession

        options = kwargs.get("options", {})
        self._computing_session = CSession(
            session_id=computing_session_id, options=options
        )
        return self

    if self._computing_type == ComputingEngine.SPARK:
        from fate_arch.computing.spark import CSession

        self._computing_session = CSession(session_id=computing_session_id)
        self._computing_type = ComputingEngine.SPARK
        return self

    if self._computing_type == ComputingEngine.LINKIS_SPARK:
        from fate_arch.computing.spark import CSession
        self._computing_session = CSession(session_id=computing_session_id)
        self._computing_type = ComputingEngine.LINKIS_SPARK
        return self

    raise RuntimeError(f"{self._computing_type} not supported")
init_federation(federation_session_id, *, runtime_conf=None, parties_info=None, service_conf=None, record=True)
Source code in fate_arch/session/_session.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def init_federation(
        self,
        federation_session_id: str,
        *,
        runtime_conf: typing.Optional[dict] = None,
        parties_info: typing.Optional[PartiesInfo] = None,
        service_conf: typing.Optional[dict] = None,
        record: bool = True,
):
    if record:
        self.save_record(engine_type=EngineType.FEDERATION,
                         engine_name=self._federation_type,
                         engine_session_id=federation_session_id,
                         engine_runtime_conf={"runtime_conf": runtime_conf, "service_conf": service_conf})
    if parties_info is None:
        if runtime_conf is None:
            raise RuntimeError(f"`party_info` and `runtime_conf` are both `None`")
        parties_info = PartiesInfo.from_conf(runtime_conf)
    self._parties_info = parties_info
    self._all_party_info = [Party(k, p) for k, v in runtime_conf['role'].items() for p in v]

    if self.is_federation_valid:
        raise RuntimeError("federation session already valid")

    if self._federation_type == FederationEngine.STANDALONE:
        from fate_arch.computing.standalone import CSession
        from fate_arch.federation.standalone import Federation

        if not self.is_computing_valid or not isinstance(
                self._computing_session, CSession
        ):
            raise RuntimeError(
                f"require computing with type {ComputingEngine.STANDALONE} valid"
            )

        self._federation_session = Federation(
            standalone_session=self._computing_session.get_standalone_session(),
            federation_session_id=federation_session_id,
            party=parties_info.local_party,
        )
        return self

    if self._federation_type == FederationEngine.EGGROLL:
        from fate_arch.computing.eggroll import CSession
        from fate_arch.federation.eggroll import Federation

        if not self.is_computing_valid or not isinstance(
                self._computing_session, CSession
        ):
            raise RuntimeError(
                f"require computing with type {ComputingEngine.EGGROLL} valid"
            )

        self._federation_session = Federation(
            rp_ctx=self._computing_session.get_rpc(),
            rs_session_id=federation_session_id,
            party=parties_info.local_party,
            proxy_endpoint=f"{service_conf['host']}:{service_conf['port']}",
        )
        return self

    if self._federation_type == FederationEngine.RABBITMQ:
        from fate_arch.federation.rabbitmq import Federation

        self._federation_session = Federation.from_conf(
            federation_session_id=federation_session_id,
            party=parties_info.local_party,
            runtime_conf=runtime_conf,
            rabbitmq_config=service_conf,
        )
        return self

    # Add pulsar support
    if self._federation_type == FederationEngine.PULSAR:
        from fate_arch.federation.pulsar import Federation

        self._federation_session = Federation.from_conf(
            federation_session_id=federation_session_id,
            party=parties_info.local_party,
            runtime_conf=runtime_conf,
            pulsar_config=service_conf,
        )
        return self

    raise RuntimeError(f"{self._federation_type} not supported")
get_table(name, namespace, ignore_disable=False)
Source code in fate_arch/session/_session.py
289
290
291
292
293
294
295
296
297
298
def get_table(self, name, namespace, ignore_disable=False) -> typing.Union[StorageTableABC, None]:
    meta = Session.get_table_meta(name=name, namespace=namespace)
    if meta is None:
        return None
    if meta.get_disable() and not ignore_disable:
        raise Exception(f"table {namespace} {name} disable: {meta.get_disable()}")
    engine = meta.get_engine()
    storage_session = self._get_or_create_storage(storage_engine=engine)
    table = storage_session.get_table(name=name, namespace=namespace)
    return table
get_table_meta(name, namespace) classmethod
Source code in fate_arch/session/_session.py
300
301
302
303
@classmethod
def get_table_meta(cls, name, namespace) -> typing.Union[StorageTableMetaABC, None]:
    meta = StorageSessionBase.get_table_meta(name=name, namespace=namespace)
    return meta
persistent(computing_table, namespace, name, schema=None, part_of_data=None, engine=None, engine_address=None, store_type=None, token=None) classmethod
Source code in fate_arch/session/_session.py
305
306
307
308
309
310
311
312
313
314
315
316
@classmethod
def persistent(cls, computing_table: CTableABC, namespace, name, schema=None, part_of_data=None,
               engine=None, engine_address=None, store_type=None, token: typing.Dict = None) -> StorageTableMetaABC:
    return StorageSessionBase.persistent(computing_table=computing_table,
                                         namespace=namespace,
                                         name=name,
                                         schema=schema,
                                         part_of_data=part_of_data,
                                         engine=engine,
                                         engine_address=engine_address,
                                         store_type=store_type,
                                         token=token)
storage(**kwargs)
Source code in fate_arch/session/_session.py
326
327
def storage(self, **kwargs):
    return self._get_or_create_storage(**kwargs)
save_record(engine_type, engine_name, engine_session_id, engine_runtime_conf=None)
Source code in fate_arch/session/_session.py
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
@DB.connection_context()
def save_record(self, engine_type, engine_name, engine_session_id, engine_runtime_conf=None):
    self._logger.info(
        f"try to save session record for manager {self._session_id}, {engine_type} {engine_name}"
        f" {engine_session_id}")
    session_record = SessionRecord()
    session_record.f_manager_session_id = self._session_id
    session_record.f_engine_type = engine_type
    session_record.f_engine_name = engine_name
    session_record.f_engine_session_id = engine_session_id
    session_record.f_engine_address = engine_runtime_conf if engine_runtime_conf else {}
    session_record.f_create_time = base_utils.current_timestamp()
    msg = f"save storage session record for manager {self._session_id}, {engine_type} {engine_name} " \
          f"{engine_session_id}"
    try:
        effect_count = session_record.save(force_insert=True)
        if effect_count != 1:
            raise RuntimeError(f"{msg} failed")
    except peewee.IntegrityError as e:
        LOGGER.warning(e)
    except Exception as e:
        raise RuntimeError(f"{msg} exception", e)
    self._logger.info(
        f"save session record for manager {self._session_id}, {engine_type} {engine_name} "
        f"{engine_session_id} successfully")
delete_session_record(engine_session_id, manager_session_id=None)
Source code in fate_arch/session/_session.py
367
368
369
370
371
372
373
374
375
376
377
@DB.connection_context()
def delete_session_record(self, engine_session_id, manager_session_id=None):
    if not manager_session_id:
        rows = SessionRecord.delete().where(SessionRecord.f_engine_session_id == engine_session_id).execute()
    else:
        rows = SessionRecord.delete().where(SessionRecord.f_engine_session_id == engine_session_id,
                                            SessionRecord.f_manager_session_id == manager_session_id).execute()
    if rows > 0:
        self._logger.info(f"delete session {engine_session_id} record successfully")
    else:
        self._logger.warning(f"delete session {engine_session_id} record failed")
query_sessions(reverse=None, order_by=None, **kwargs) classmethod
Source code in fate_arch/session/_session.py
379
380
381
382
383
384
385
386
@classmethod
@DB.connection_context()
def query_sessions(cls, reverse=None, order_by=None, **kwargs):
    try:
        session_records = SessionRecord.query(reverse=reverse, order_by=order_by, **kwargs)
        return session_records
    except BaseException:
        return []
get_session_from_record(**kwargs)
Source code in fate_arch/session/_session.py
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
@DB.connection_context()
def get_session_from_record(self, **kwargs):
    self._logger.info(f"query by manager session id {self._session_id}")
    session_records = self.query_sessions(manager_session_id=self.session_id, **kwargs)
    self._logger.info([session_record.f_engine_session_id for session_record in session_records])
    for session_record in session_records:
        try:
            engine_session_id = session_record.f_engine_session_id
            if session_record.f_engine_type == EngineType.COMPUTING:
                self._init_computing_if_not_valid(computing_session_id=engine_session_id)
            elif session_record.f_engine_type == EngineType.STORAGE:
                self._get_or_create_storage(storage_session_id=engine_session_id,
                                            storage_engine=session_record.f_engine_name,
                                            record=False)
            elif session_record.f_engine_type == EngineType.FEDERATION:
                self._logger.info(f"engine runtime conf: {session_record.f_engine_address}")
                self._init_federation_if_not_valid(federation_session_id=engine_session_id,
                                                   engine_runtime_conf=session_record.f_engine_address)
        except Exception as e:
            self._logger.info(e)
            self.delete_session_record(engine_session_id=session_record.f_engine_session_id)
destroy_all_sessions(**kwargs)
Source code in fate_arch/session/_session.py
445
446
447
448
449
450
451
def destroy_all_sessions(self, **kwargs):
    self._logger.info(f"start destroy manager session {self._session_id} all sessions")
    self.get_session_from_record(**kwargs)
    self.destroy_federation_session()
    self.destroy_storage_session()
    self.destroy_computing_session()
    self._logger.info(f"finish destroy manager session {self._session_id} all sessions")
destroy_computing_session()
Source code in fate_arch/session/_session.py
453
454
455
456
457
458
459
460
461
462
def destroy_computing_session(self):
    if self.is_computing_valid:
        try:
            self._logger.info(f"try to destroy computing session {self._computing_session.session_id}")
            self._computing_session.destroy()
        except Exception as e:
            self._logger.info(f"destroy computing session {self._computing_session.session_id} failed", e)

        self.delete_session_record(engine_session_id=self._computing_session.session_id)
        self._computing_session = None
destroy_storage_session()
Source code in fate_arch/session/_session.py
464
465
466
467
468
469
470
471
472
473
474
475
def destroy_storage_session(self):
    for session_id, session in self._storage_session.items():
        try:
            self._logger.info(f"try to destroy storage session {session_id}")
            session.destroy()
            self._logger.info(f"destroy storage session {session_id} successfully")
        except Exception as e:
            self._logger.exception(f"destroy storage session {session_id} failed", e)

        self.delete_session_record(engine_session_id=session_id)

    self._storage_session = {}
destroy_federation_session()
Source code in fate_arch/session/_session.py
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
def destroy_federation_session(self):
    if self.is_federation_valid:
        try:
            if self._parties_info.local_party.role != "local":
                self._logger.info(
                    f"try to destroy federation session {self._federation_session.session_id} type"
                    f" {EngineType.FEDERATION} role {self._parties_info.local_party.role}")
                self._federation_session.destroy(parties=self._all_party_info)
                self._logger.info(f"destroy federation session {self._federation_session.session_id} done")
        except Exception as e:
            self._logger.info(f"destroy federation failed: {e}")

        self.delete_session_record(engine_session_id=self._federation_session.session_id,
                                   manager_session_id=self.session_id)
        self._federation_session = None
wait_remote_all_done(timeout=None)
Source code in fate_arch/session/_session.py
493
494
495
496
def wait_remote_all_done(self, timeout=None):
    LOGGER.info(f"remote futures: {remote_status._remote_futures}, waiting...")
    remote_status.wait_all_remote_done(timeout)
    LOGGER.info(f"remote futures: {remote_status._remote_futures}, all done")

最后更新: 2021-11-15