Skip to content

Session

Session API

Session is the context to use computing, storage and federation resouraces. At most situation, users should not concern how Session is create. FATE-Flow is responsed to create and init Session when Task launched.

For those who want to use computing, storage and federation api outside FATE-Flow Task, flowing is a short guide.

  1. init Session

    sess = Session()
    sess.as_global()
    
    # flowing is optional, call if needed
    
    # init computing
    sess.init_computing(...)
    
    # init federation
    sess.init_federation(...)
    
    # init federation
    sess.init_storage(...)
    
    2. calling specific api

    computing = sess.computing
    federation = sess.federation
    storage = sess.storage
    
  2. computing api has a shortcut

    from fate_arch.session import computing_session
    computing_session.init(...)
    computing_session.parallelize(...)
    

Detailed API

Session

Source code in fate_arch/session/_session.py
class Session(object):
    __GLOBAL_SESSION = None

    @classmethod
    def get_global(cls):
        return cls.__GLOBAL_SESSION

    @classmethod
    def _as_global(cls, sess):
        cls.__GLOBAL_SESSION = sess

    def as_global(self):
        self._as_global(self)
        return self

    def __init__(self, session_id: str = None, options=None):
        if options is None:
            options = {}
        engines = engine_utils.get_engines()
        LOGGER.info(f"using engines: {engines}")
        computing_type = engines.get(EngineType.COMPUTING, None)
        if computing_type is None:
            raise RuntimeError(f"must set default engines on conf/service_conf.yaml")

        self._computing_type = engines.get(EngineType.COMPUTING, None)
        self._federation_type = engines.get(EngineType.FEDERATION, None)
        self._storage_engine = engines.get(EngineType.STORAGE, None)
        self._computing_session: typing.Optional[CSessionABC] = None
        self._federation_session: typing.Optional[FederationABC] = None
        self._storage_session: typing.Dict[StorageSessionABC] = {}
        self._parties_info: typing.Optional[PartiesInfo] = None
        self._session_id = str(uuid.uuid1()) if not session_id else session_id
        self._logger = LOGGER if options.get("logger", None) is None else options.get("logger", None)

        self._logger.info(f"create manager session {self._session_id}")

        # init meta db
        init_database_tables()

    @property
    def session_id(self) -> str:
        return self._session_id

    def _open(self):
        return self

    def _close(self):
        self.destroy_all_sessions()

    def __enter__(self):
        return self._open()

    def __exit__(self, exc_type, exc_val, exc_tb):
        if exc_tb:
            self._logger.exception("", exc_info=(exc_type, exc_val, exc_tb))
        return self._close()

    def init_computing(self,
                       computing_session_id: str = None,
                       record: bool = True,
                       **kwargs):
        computing_session_id = f"{self._session_id}_computing_{uuid.uuid1()}" if not computing_session_id else computing_session_id
        if self.is_computing_valid:
            raise RuntimeError(f"computing session already valid")

        if record:
            self.save_record(engine_type=EngineType.COMPUTING,
                             engine_name=self._computing_type,
                             engine_session_id=computing_session_id)

        if self._computing_type == ComputingEngine.STANDALONE:
            from fate_arch.computing.standalone import CSession

            self._computing_session = CSession(session_id=computing_session_id)
            self._computing_type = ComputingEngine.STANDALONE
            return self

        if self._computing_type == ComputingEngine.EGGROLL:
            from fate_arch.computing.eggroll import CSession

            options = kwargs.get("options", {})
            self._computing_session = CSession(
                session_id=computing_session_id, options=options
            )
            return self

        if self._computing_type == ComputingEngine.SPARK:
            from fate_arch.computing.spark import CSession

            self._computing_session = CSession(session_id=computing_session_id)
            self._computing_type = ComputingEngine.SPARK
            return self

        if self._computing_type == ComputingEngine.LINKIS_SPARK:
            from fate_arch.computing.spark import CSession
            self._computing_session = CSession(session_id=computing_session_id)
            self._computing_type = ComputingEngine.LINKIS_SPARK
            return self

        raise RuntimeError(f"{self._computing_type} not supported")

    def init_federation(
            self,
            federation_session_id: str,
            *,
            runtime_conf: typing.Optional[dict] = None,
            parties_info: typing.Optional[PartiesInfo] = None,
            service_conf: typing.Optional[dict] = None,
    ):

        if parties_info is None:
            if runtime_conf is None:
                raise RuntimeError(f"`party_info` and `runtime_conf` are both `None`")
            parties_info = PartiesInfo.from_conf(runtime_conf)
        self._parties_info = parties_info

        if self.is_federation_valid:
            raise RuntimeError("federation session already valid")

        if self._federation_type == FederationEngine.STANDALONE:
            from fate_arch.computing.standalone import CSession
            from fate_arch.federation.standalone import Federation

            if not self.is_computing_valid or not isinstance(
                    self._computing_session, CSession
            ):
                raise RuntimeError(
                    f"require computing with type {ComputingEngine.STANDALONE} valid"
                )

            self._federation_session = Federation(
                standalone_session=self._computing_session.get_standalone_session(),
                federation_session_id=federation_session_id,
                party=parties_info.local_party,
            )
            return self

        if self._federation_type == FederationEngine.EGGROLL:
            from fate_arch.computing.eggroll import CSession
            from fate_arch.federation.eggroll import Federation

            if not self.is_computing_valid or not isinstance(
                    self._computing_session, CSession
            ):
                raise RuntimeError(
                    f"require computing with type {ComputingEngine.EGGROLL} valid"
                )

            self._federation_session = Federation(
                rp_ctx=self._computing_session.get_rpc(),
                rs_session_id=federation_session_id,
                party=parties_info.local_party,
                proxy_endpoint=f"{service_conf['host']}:{service_conf['port']}",
            )
            return self

        if self._federation_type == FederationEngine.RABBITMQ:
            from fate_arch.computing.spark import CSession
            from fate_arch.federation.rabbitmq import Federation

            if not self.is_computing_valid or not isinstance(
                    self._computing_session, CSession
            ):
                raise RuntimeError(
                    f"require computing with type {ComputingEngine.SPARK} valid"
                )

            self._federation_session = Federation.from_conf(
                federation_session_id=federation_session_id,
                party=parties_info.local_party,
                runtime_conf=runtime_conf,
                rabbitmq_config=service_conf,
            )
            return self

        # Add pulsar support
        if self._federation_type == FederationEngine.PULSAR:
            from fate_arch.computing.spark import CSession
            from fate_arch.federation.pulsar import Federation

            if not self.is_computing_valid or not isinstance(
                    self._computing_session, CSession
            ):
                raise RuntimeError(
                    f"require computing with type {ComputingEngine.SPARK} valid"
                )

            self._federation_session = Federation.from_conf(
                federation_session_id=federation_session_id,
                party=parties_info.local_party,
                runtime_conf=runtime_conf,
                pulsar_config=service_conf,
            )
            return self

        raise RuntimeError(f"{self._federation_type} not supported")

    def _get_or_create_storage(self,
                               storage_session_id=None,
                               storage_engine=None,
                               record: bool = True,
                               **kwargs) -> StorageSessionABC:
        storage_session_id = f"{self._session_id}_storage_{uuid.uuid1()}" if not storage_session_id else storage_session_id

        if storage_session_id in self._storage_session:
            return self._storage_session[storage_session_id]
        else:
            if storage_engine is None:
                storage_engine = self._storage_engine

        for session in self._storage_session.values():
            if storage_engine == session.engine:
                return session

        if record:
            self.save_record(engine_type=EngineType.STORAGE,
                             engine_name=storage_engine,
                             engine_session_id=storage_session_id)

        if storage_engine == StorageEngine.EGGROLL:
            from fate_arch.storage.eggroll import StorageSession
            storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))

        elif storage_engine == StorageEngine.STANDALONE:
            from fate_arch.storage.standalone import StorageSession
            storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))

        elif storage_engine == StorageEngine.MYSQL:
            from fate_arch.storage.mysql import StorageSession
            storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))

        elif storage_engine == StorageEngine.HDFS:
            from fate_arch.storage.hdfs import StorageSession
            storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))

        elif storage_engine == StorageEngine.HIVE:
            from fate_arch.storage.hive import StorageSession
            storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))

        elif storage_engine == StorageEngine.LINKIS_HIVE:
            from fate_arch.storage.linkis_hive import StorageSession
            storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))

        elif storage_engine == StorageEngine.PATH:
            from fate_arch.storage.path import StorageSession
            storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))

        elif storage_engine == StorageEngine.LOCALFS:
            from fate_arch.storage.localfs import StorageSession
            storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))

        else:
            raise NotImplementedError(f"can not be initialized with storage engine: {storage_engine}")

        self._storage_session[storage_session_id] = storage_session

        return storage_session

    def get_table(self, name, namespace) -> typing.Union[StorageTableABC, None]:
        meta = Session.get_table_meta(name=name, namespace=namespace)
        if meta is None:
            return None
        engine = meta.get_engine()
        storage_session = self._get_or_create_storage(storage_engine=engine)
        table = storage_session.get_table(name=name, namespace=namespace)
        return table

    @classmethod
    def get_table_meta(cls, name, namespace) -> typing.Union[StorageTableMetaABC, None]:
        meta = StorageSessionBase.get_table_meta(name=name, namespace=namespace)
        return meta

    @classmethod
    def persistent(cls, computing_table: CTableABC, namespace, name, schema=None, part_of_data=None,
                   engine=None, engine_address=None, store_type=None, token: typing.Dict = None) -> StorageTableMetaABC:
        return StorageSessionBase.persistent(computing_table=computing_table,
                                             namespace=namespace,
                                             name=name,
                                             schema=schema,
                                             part_of_data=part_of_data,
                                             engine=engine,
                                             engine_address=engine_address,
                                             store_type=store_type,
                                             token=token)

    @property
    def computing(self) -> CSessionABC:
        return self._computing_session

    @property
    def federation(self) -> FederationABC:
        return self._federation_session

    def storage(self, **kwargs):
        return self._get_or_create_storage(**kwargs)

    @property
    def parties(self):
        return self._parties_info

    @property
    def is_computing_valid(self):
        return self._computing_session is not None

    @property
    def is_federation_valid(self):
        return self._federation_session is not None

    @DB.connection_context()
    def save_record(self, engine_type, engine_name, engine_session_id):
        self._logger.info(
            f"try to save session record for manager {self._session_id}, {engine_type} {engine_name} {engine_session_id}")
        session_record = SessionRecord()
        session_record.f_manager_session_id = self._session_id
        session_record.f_engine_type = engine_type
        session_record.f_engine_name = engine_name
        session_record.f_engine_session_id = engine_session_id
        # TODO: engine address
        session_record.f_engine_address = {}
        session_record.f_create_time = base_utils.current_timestamp()
        msg = f"save storage session record for manager {self._session_id}, {engine_type} {engine_name} {engine_session_id}"
        try:
            effect_count = session_record.save(force_insert=True)
            if effect_count != 1:
                raise RuntimeError(f"{msg} failed")
        except peewee.IntegrityError as e:
            LOGGER.warning(e)
        except Exception as e:
            raise RuntimeError(f"{msg} exception", e)
        self._logger.info(
            f"save session record for manager {self._session_id}, {engine_type} {engine_name} {engine_session_id} successfully")

    @DB.connection_context()
    def delete_session_record(self, engine_session_id):
        rows = SessionRecord.delete().where(SessionRecord.f_engine_session_id == engine_session_id).execute()
        if rows > 0:
            self._logger.info(f"delete session {engine_session_id} record successfully")
        else:
            self._logger.warning(f"delete session {engine_session_id} record failed")

    @classmethod
    @DB.connection_context()
    def query_sessions(cls, reverse=None, order_by=None, **kwargs):
        return SessionRecord.query(reverse=reverse, order_by=order_by, **kwargs)

    @DB.connection_context()
    def get_session_from_record(self, **kwargs):
        self._logger.info(f"query by manager session id {self._session_id}")
        session_records = self.query_sessions(manager_session_id=self._session_id, **kwargs)
        self._logger.info([session_record.f_engine_session_id for session_record in session_records])
        for session_record in session_records:
            try:
                engine_session_id = session_record.f_engine_session_id
                if session_record.f_engine_type == EngineType.COMPUTING:
                    self._init_computing_if_not_valid(computing_session_id=engine_session_id)
                elif session_record.f_engine_type == EngineType.STORAGE:
                    self._get_or_create_storage(storage_session_id=engine_session_id,
                                                storage_engine=session_record.f_engine_name,
                                                record=False)
            except Exception as e:
                self._logger.error(e)
                self.delete_session_record(engine_session_id=session_record.f_engine_session_id)

    def _init_computing_if_not_valid(self, computing_session_id):
        if not self.is_computing_valid:
            self.init_computing(computing_session_id=computing_session_id, record=False)
            return True
        elif self._computing_session.session_id != computing_session_id:
            self._logger.warning(
                f"manager session had computing session {self._computing_session.session_id} different with query from db session {computing_session_id}")
            return False
        else:
            # already exists
            return True

    def destroy_all_sessions(self, **kwargs):
        self._logger.info(f"start destroy manager session {self._session_id} all sessions")
        self.get_session_from_record(**kwargs)
        self.destroy_storage_session()
        self.destroy_computing_session()
        self._logger.info(f"finish destroy manager session {self._session_id} all sessions")

    def destroy_computing_session(self):
        if self.is_computing_valid:
            try:
                self._logger.info(f"try to destroy computing session {self._computing_session.session_id}")
                try:
                    self._computing_session.stop()
                except:
                    self._computing_session.kill()
                self._logger.info(f"destroy computing session {self._computing_session.session_id} successfully")
            except Exception as e:
                self._logger.info(f"destroy computing session {self._computing_session.session_id} failed", e)
            self.delete_session_record(engine_session_id=self._computing_session.session_id)

    def destroy_storage_session(self):
        for session_id, session in self._storage_session.items():
            try:
                self._logger.info(f"try to destroy storage session {session_id}")
                session.destroy()
                self._logger.info(f"destroy storage session {session_id} successfully")
            except Exception as e:
                self._logger.exception(f"destroy storage session {session_id} failed", e)
            self.delete_session_record(engine_session_id=session_id)

    def wait_remote_all_done(self, timeout=None):
        LOGGER.info(f"remote futures: {remote_status._remote_futures}, waiting...")
        remote_status.wait_all_remote_done(timeout)
        LOGGER.info(f"remote futures: {remote_status._remote_futures}, all done")
session_id: str property readonly
computing: CSessionABC property readonly
federation: FederationABC property readonly
parties property readonly
is_computing_valid property readonly
is_federation_valid property readonly
get_global() classmethod
Source code in fate_arch/session/_session.py
@classmethod
def get_global(cls):
    return cls.__GLOBAL_SESSION
as_global(self)
Source code in fate_arch/session/_session.py
def as_global(self):
    self._as_global(self)
    return self
__init__(self, session_id=None, options=None) special
Source code in fate_arch/session/_session.py
def __init__(self, session_id: str = None, options=None):
    if options is None:
        options = {}
    engines = engine_utils.get_engines()
    LOGGER.info(f"using engines: {engines}")
    computing_type = engines.get(EngineType.COMPUTING, None)
    if computing_type is None:
        raise RuntimeError(f"must set default engines on conf/service_conf.yaml")

    self._computing_type = engines.get(EngineType.COMPUTING, None)
    self._federation_type = engines.get(EngineType.FEDERATION, None)
    self._storage_engine = engines.get(EngineType.STORAGE, None)
    self._computing_session: typing.Optional[CSessionABC] = None
    self._federation_session: typing.Optional[FederationABC] = None
    self._storage_session: typing.Dict[StorageSessionABC] = {}
    self._parties_info: typing.Optional[PartiesInfo] = None
    self._session_id = str(uuid.uuid1()) if not session_id else session_id
    self._logger = LOGGER if options.get("logger", None) is None else options.get("logger", None)

    self._logger.info(f"create manager session {self._session_id}")

    # init meta db
    init_database_tables()
__enter__(self) special
Source code in fate_arch/session/_session.py
def __enter__(self):
    return self._open()
__exit__(self, exc_type, exc_val, exc_tb) special
Source code in fate_arch/session/_session.py
def __exit__(self, exc_type, exc_val, exc_tb):
    if exc_tb:
        self._logger.exception("", exc_info=(exc_type, exc_val, exc_tb))
    return self._close()
init_computing(self, computing_session_id=None, record=True, **kwargs)
Source code in fate_arch/session/_session.py
def init_computing(self,
                   computing_session_id: str = None,
                   record: bool = True,
                   **kwargs):
    computing_session_id = f"{self._session_id}_computing_{uuid.uuid1()}" if not computing_session_id else computing_session_id
    if self.is_computing_valid:
        raise RuntimeError(f"computing session already valid")

    if record:
        self.save_record(engine_type=EngineType.COMPUTING,
                         engine_name=self._computing_type,
                         engine_session_id=computing_session_id)

    if self._computing_type == ComputingEngine.STANDALONE:
        from fate_arch.computing.standalone import CSession

        self._computing_session = CSession(session_id=computing_session_id)
        self._computing_type = ComputingEngine.STANDALONE
        return self

    if self._computing_type == ComputingEngine.EGGROLL:
        from fate_arch.computing.eggroll import CSession

        options = kwargs.get("options", {})
        self._computing_session = CSession(
            session_id=computing_session_id, options=options
        )
        return self

    if self._computing_type == ComputingEngine.SPARK:
        from fate_arch.computing.spark import CSession

        self._computing_session = CSession(session_id=computing_session_id)
        self._computing_type = ComputingEngine.SPARK
        return self

    if self._computing_type == ComputingEngine.LINKIS_SPARK:
        from fate_arch.computing.spark import CSession
        self._computing_session = CSession(session_id=computing_session_id)
        self._computing_type = ComputingEngine.LINKIS_SPARK
        return self

    raise RuntimeError(f"{self._computing_type} not supported")
init_federation(self, federation_session_id, *, runtime_conf=None, parties_info=None, service_conf=None)
Source code in fate_arch/session/_session.py
def init_federation(
        self,
        federation_session_id: str,
        *,
        runtime_conf: typing.Optional[dict] = None,
        parties_info: typing.Optional[PartiesInfo] = None,
        service_conf: typing.Optional[dict] = None,
):

    if parties_info is None:
        if runtime_conf is None:
            raise RuntimeError(f"`party_info` and `runtime_conf` are both `None`")
        parties_info = PartiesInfo.from_conf(runtime_conf)
    self._parties_info = parties_info

    if self.is_federation_valid:
        raise RuntimeError("federation session already valid")

    if self._federation_type == FederationEngine.STANDALONE:
        from fate_arch.computing.standalone import CSession
        from fate_arch.federation.standalone import Federation

        if not self.is_computing_valid or not isinstance(
                self._computing_session, CSession
        ):
            raise RuntimeError(
                f"require computing with type {ComputingEngine.STANDALONE} valid"
            )

        self._federation_session = Federation(
            standalone_session=self._computing_session.get_standalone_session(),
            federation_session_id=federation_session_id,
            party=parties_info.local_party,
        )
        return self

    if self._federation_type == FederationEngine.EGGROLL:
        from fate_arch.computing.eggroll import CSession
        from fate_arch.federation.eggroll import Federation

        if not self.is_computing_valid or not isinstance(
                self._computing_session, CSession
        ):
            raise RuntimeError(
                f"require computing with type {ComputingEngine.EGGROLL} valid"
            )

        self._federation_session = Federation(
            rp_ctx=self._computing_session.get_rpc(),
            rs_session_id=federation_session_id,
            party=parties_info.local_party,
            proxy_endpoint=f"{service_conf['host']}:{service_conf['port']}",
        )
        return self

    if self._federation_type == FederationEngine.RABBITMQ:
        from fate_arch.computing.spark import CSession
        from fate_arch.federation.rabbitmq import Federation

        if not self.is_computing_valid or not isinstance(
                self._computing_session, CSession
        ):
            raise RuntimeError(
                f"require computing with type {ComputingEngine.SPARK} valid"
            )

        self._federation_session = Federation.from_conf(
            federation_session_id=federation_session_id,
            party=parties_info.local_party,
            runtime_conf=runtime_conf,
            rabbitmq_config=service_conf,
        )
        return self

    # Add pulsar support
    if self._federation_type == FederationEngine.PULSAR:
        from fate_arch.computing.spark import CSession
        from fate_arch.federation.pulsar import Federation

        if not self.is_computing_valid or not isinstance(
                self._computing_session, CSession
        ):
            raise RuntimeError(
                f"require computing with type {ComputingEngine.SPARK} valid"
            )

        self._federation_session = Federation.from_conf(
            federation_session_id=federation_session_id,
            party=parties_info.local_party,
            runtime_conf=runtime_conf,
            pulsar_config=service_conf,
        )
        return self

    raise RuntimeError(f"{self._federation_type} not supported")
get_table(self, name, namespace)
Source code in fate_arch/session/_session.py
def get_table(self, name, namespace) -> typing.Union[StorageTableABC, None]:
    meta = Session.get_table_meta(name=name, namespace=namespace)
    if meta is None:
        return None
    engine = meta.get_engine()
    storage_session = self._get_or_create_storage(storage_engine=engine)
    table = storage_session.get_table(name=name, namespace=namespace)
    return table
get_table_meta(name, namespace) classmethod
Source code in fate_arch/session/_session.py
@classmethod
def get_table_meta(cls, name, namespace) -> typing.Union[StorageTableMetaABC, None]:
    meta = StorageSessionBase.get_table_meta(name=name, namespace=namespace)
    return meta
persistent(computing_table, namespace, name, schema=None, part_of_data=None, engine=None, engine_address=None, store_type=None, token=None) classmethod
Source code in fate_arch/session/_session.py
@classmethod
def persistent(cls, computing_table: CTableABC, namespace, name, schema=None, part_of_data=None,
               engine=None, engine_address=None, store_type=None, token: typing.Dict = None) -> StorageTableMetaABC:
    return StorageSessionBase.persistent(computing_table=computing_table,
                                         namespace=namespace,
                                         name=name,
                                         schema=schema,
                                         part_of_data=part_of_data,
                                         engine=engine,
                                         engine_address=engine_address,
                                         store_type=store_type,
                                         token=token)
storage(self, **kwargs)
Source code in fate_arch/session/_session.py
def storage(self, **kwargs):
    return self._get_or_create_storage(**kwargs)
save_record(self, engine_type, engine_name, engine_session_id)
Source code in fate_arch/session/_session.py
@DB.connection_context()
def save_record(self, engine_type, engine_name, engine_session_id):
    self._logger.info(
        f"try to save session record for manager {self._session_id}, {engine_type} {engine_name} {engine_session_id}")
    session_record = SessionRecord()
    session_record.f_manager_session_id = self._session_id
    session_record.f_engine_type = engine_type
    session_record.f_engine_name = engine_name
    session_record.f_engine_session_id = engine_session_id
    # TODO: engine address
    session_record.f_engine_address = {}
    session_record.f_create_time = base_utils.current_timestamp()
    msg = f"save storage session record for manager {self._session_id}, {engine_type} {engine_name} {engine_session_id}"
    try:
        effect_count = session_record.save(force_insert=True)
        if effect_count != 1:
            raise RuntimeError(f"{msg} failed")
    except peewee.IntegrityError as e:
        LOGGER.warning(e)
    except Exception as e:
        raise RuntimeError(f"{msg} exception", e)
    self._logger.info(
        f"save session record for manager {self._session_id}, {engine_type} {engine_name} {engine_session_id} successfully")
delete_session_record(self, engine_session_id)
Source code in fate_arch/session/_session.py
@DB.connection_context()
def delete_session_record(self, engine_session_id):
    rows = SessionRecord.delete().where(SessionRecord.f_engine_session_id == engine_session_id).execute()
    if rows > 0:
        self._logger.info(f"delete session {engine_session_id} record successfully")
    else:
        self._logger.warning(f"delete session {engine_session_id} record failed")
query_sessions(cls, reverse=None, order_by=None, **kwargs) classmethod
Source code in fate_arch/session/_session.py
@classmethod
@DB.connection_context()
def query_sessions(cls, reverse=None, order_by=None, **kwargs):
    return SessionRecord.query(reverse=reverse, order_by=order_by, **kwargs)
get_session_from_record(self, **kwargs)
Source code in fate_arch/session/_session.py
@DB.connection_context()
def get_session_from_record(self, **kwargs):
    self._logger.info(f"query by manager session id {self._session_id}")
    session_records = self.query_sessions(manager_session_id=self._session_id, **kwargs)
    self._logger.info([session_record.f_engine_session_id for session_record in session_records])
    for session_record in session_records:
        try:
            engine_session_id = session_record.f_engine_session_id
            if session_record.f_engine_type == EngineType.COMPUTING:
                self._init_computing_if_not_valid(computing_session_id=engine_session_id)
            elif session_record.f_engine_type == EngineType.STORAGE:
                self._get_or_create_storage(storage_session_id=engine_session_id,
                                            storage_engine=session_record.f_engine_name,
                                            record=False)
        except Exception as e:
            self._logger.error(e)
            self.delete_session_record(engine_session_id=session_record.f_engine_session_id)
destroy_all_sessions(self, **kwargs)
Source code in fate_arch/session/_session.py
def destroy_all_sessions(self, **kwargs):
    self._logger.info(f"start destroy manager session {self._session_id} all sessions")
    self.get_session_from_record(**kwargs)
    self.destroy_storage_session()
    self.destroy_computing_session()
    self._logger.info(f"finish destroy manager session {self._session_id} all sessions")
destroy_computing_session(self)
Source code in fate_arch/session/_session.py
def destroy_computing_session(self):
    if self.is_computing_valid:
        try:
            self._logger.info(f"try to destroy computing session {self._computing_session.session_id}")
            try:
                self._computing_session.stop()
            except:
                self._computing_session.kill()
            self._logger.info(f"destroy computing session {self._computing_session.session_id} successfully")
        except Exception as e:
            self._logger.info(f"destroy computing session {self._computing_session.session_id} failed", e)
        self.delete_session_record(engine_session_id=self._computing_session.session_id)
destroy_storage_session(self)
Source code in fate_arch/session/_session.py
def destroy_storage_session(self):
    for session_id, session in self._storage_session.items():
        try:
            self._logger.info(f"try to destroy storage session {session_id}")
            session.destroy()
            self._logger.info(f"destroy storage session {session_id} successfully")
        except Exception as e:
            self._logger.exception(f"destroy storage session {session_id} failed", e)
        self.delete_session_record(engine_session_id=session_id)
wait_remote_all_done(self, timeout=None)
Source code in fate_arch/session/_session.py
def wait_remote_all_done(self, timeout=None):
    LOGGER.info(f"remote futures: {remote_status._remote_futures}, waiting...")
    remote_status.wait_all_remote_done(timeout)
    LOGGER.info(f"remote futures: {remote_status._remote_futures}, all done")

Last update: 2021-11-15
Back to top