Session
Session API¶
Session
is the context to use computing
, storage
and federation
resouraces.
At most situation, users should not concern how Session
is create.
FATE-Flow
is responsed to create and init Session
when Task
launched.
For those who want to use computing
, storage
and federation
api outside FATE-Flow Task
,
flowing is a short guide.
-
init Session
2. calling specific apisess = Session() sess.as_global() # flowing is optional, call if needed # init computing sess.init_computing(...) # init federation sess.init_federation(...) # init federation sess.init_storage(...)
computing = sess.computing federation = sess.federation storage = sess.storage
-
computing api has a shortcut
from fate_arch.session import computing_session computing_session.init(...) computing_session.parallelize(...)
Detailed API¶
Session
¶
Source code in fate_arch/session/_session.py
class Session(object):
__GLOBAL_SESSION = None
@classmethod
def get_global(cls):
return cls.__GLOBAL_SESSION
@classmethod
def _as_global(cls, sess):
cls.__GLOBAL_SESSION = sess
def as_global(self):
self._as_global(self)
return self
def __init__(self, session_id: str = None, options=None):
if options is None:
options = {}
engines = engine_utils.get_engines()
LOGGER.info(f"using engines: {engines}")
computing_type = engines.get(EngineType.COMPUTING, None)
if computing_type is None:
raise RuntimeError(f"must set default engines on conf/service_conf.yaml")
self._computing_type = engines.get(EngineType.COMPUTING, None)
self._federation_type = engines.get(EngineType.FEDERATION, None)
self._storage_engine = engines.get(EngineType.STORAGE, None)
self._computing_session: typing.Optional[CSessionABC] = None
self._federation_session: typing.Optional[FederationABC] = None
self._storage_session: typing.Dict[StorageSessionABC] = {}
self._parties_info: typing.Optional[PartiesInfo] = None
self._all_party_info: typing.List[Party] = []
self._session_id = str(uuid.uuid1()) if not session_id else session_id
self._logger = LOGGER if options.get("logger", None) is None else options.get("logger", None)
self._logger.info(f"create manager session {self._session_id}")
# init meta db
init_database_tables()
@property
def session_id(self) -> str:
return self._session_id
def _open(self):
return self
def _close(self):
self.destroy_all_sessions()
def __enter__(self):
return self._open()
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_tb:
self._logger.exception("", exc_info=(exc_type, exc_val, exc_tb))
return self._close()
def init_computing(self,
computing_session_id: str = None,
record: bool = True,
**kwargs):
computing_session_id = f"{self._session_id}_computing_{uuid.uuid1()}" if not computing_session_id else computing_session_id
if self.is_computing_valid:
raise RuntimeError(f"computing session already valid")
if record:
self.save_record(engine_type=EngineType.COMPUTING,
engine_name=self._computing_type,
engine_session_id=computing_session_id)
if self._computing_type == ComputingEngine.STANDALONE:
from fate_arch.computing.standalone import CSession
options = kwargs.get("options", {})
self._computing_session = CSession(session_id=computing_session_id, options=options)
self._computing_type = ComputingEngine.STANDALONE
return self
if self._computing_type == ComputingEngine.EGGROLL:
from fate_arch.computing.eggroll import CSession
options = kwargs.get("options", {})
self._computing_session = CSession(
session_id=computing_session_id, options=options
)
return self
if self._computing_type == ComputingEngine.SPARK:
from fate_arch.computing.spark import CSession
self._computing_session = CSession(session_id=computing_session_id)
self._computing_type = ComputingEngine.SPARK
return self
if self._computing_type == ComputingEngine.LINKIS_SPARK:
from fate_arch.computing.spark import CSession
self._computing_session = CSession(session_id=computing_session_id)
self._computing_type = ComputingEngine.LINKIS_SPARK
return self
raise RuntimeError(f"{self._computing_type} not supported")
def init_federation(
self,
federation_session_id: str,
*,
runtime_conf: typing.Optional[dict] = None,
parties_info: typing.Optional[PartiesInfo] = None,
service_conf: typing.Optional[dict] = None,
record: bool = True,
):
if record:
self.save_record(engine_type=EngineType.FEDERATION,
engine_name=self._federation_type,
engine_session_id=federation_session_id,
engine_runtime_conf={"runtime_conf": runtime_conf, "service_conf": service_conf})
if parties_info is None:
if runtime_conf is None:
raise RuntimeError(f"`party_info` and `runtime_conf` are both `None`")
parties_info = PartiesInfo.from_conf(runtime_conf)
self._parties_info = parties_info
self._all_party_info = [Party(k, p) for k, v in runtime_conf['role'].items() for p in v]
if self.is_federation_valid:
raise RuntimeError("federation session already valid")
if self._federation_type == FederationEngine.STANDALONE:
from fate_arch.computing.standalone import CSession
from fate_arch.federation.standalone import Federation
if not self.is_computing_valid or not isinstance(
self._computing_session, CSession
):
raise RuntimeError(
f"require computing with type {ComputingEngine.STANDALONE} valid"
)
self._federation_session = Federation(
standalone_session=self._computing_session.get_standalone_session(),
federation_session_id=federation_session_id,
party=parties_info.local_party,
)
return self
if self._federation_type == FederationEngine.EGGROLL:
from fate_arch.computing.eggroll import CSession
from fate_arch.federation.eggroll import Federation
if not self.is_computing_valid or not isinstance(
self._computing_session, CSession
):
raise RuntimeError(
f"require computing with type {ComputingEngine.EGGROLL} valid"
)
self._federation_session = Federation(
rp_ctx=self._computing_session.get_rpc(),
rs_session_id=federation_session_id,
party=parties_info.local_party,
proxy_endpoint=f"{service_conf['host']}:{service_conf['port']}",
)
return self
if self._federation_type == FederationEngine.RABBITMQ:
from fate_arch.federation.rabbitmq import Federation
self._federation_session = Federation.from_conf(
federation_session_id=federation_session_id,
party=parties_info.local_party,
runtime_conf=runtime_conf,
rabbitmq_config=service_conf,
)
return self
# Add pulsar support
if self._federation_type == FederationEngine.PULSAR:
from fate_arch.federation.pulsar import Federation
self._federation_session = Federation.from_conf(
federation_session_id=federation_session_id,
party=parties_info.local_party,
runtime_conf=runtime_conf,
pulsar_config=service_conf,
)
return self
raise RuntimeError(f"{self._federation_type} not supported")
def _get_or_create_storage(self,
storage_session_id=None,
storage_engine=None,
record: bool = True,
**kwargs) -> StorageSessionABC:
storage_session_id = f"{self._session_id}_storage_{uuid.uuid1()}" if not storage_session_id else storage_session_id
if storage_session_id in self._storage_session:
return self._storage_session[storage_session_id]
else:
if storage_engine is None:
storage_engine = self._storage_engine
for session in self._storage_session.values():
if storage_engine == session.engine:
return session
if record:
self.save_record(engine_type=EngineType.STORAGE,
engine_name=storage_engine,
engine_session_id=storage_session_id)
if storage_engine == StorageEngine.EGGROLL:
from fate_arch.storage.eggroll import StorageSession
storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))
elif storage_engine == StorageEngine.STANDALONE:
from fate_arch.storage.standalone import StorageSession
storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))
elif storage_engine == StorageEngine.MYSQL:
from fate_arch.storage.mysql import StorageSession
storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))
elif storage_engine == StorageEngine.HDFS:
from fate_arch.storage.hdfs import StorageSession
storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))
elif storage_engine == StorageEngine.HIVE:
from fate_arch.storage.hive import StorageSession
storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))
elif storage_engine == StorageEngine.LINKIS_HIVE:
from fate_arch.storage.linkis_hive import StorageSession
storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))
elif storage_engine == StorageEngine.PATH:
from fate_arch.storage.path import StorageSession
storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))
elif storage_engine == StorageEngine.LOCALFS:
from fate_arch.storage.localfs import StorageSession
storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))
elif storage_engine == StorageEngine.API:
from fate_arch.storage.api import StorageSession
storage_session = StorageSession(session_id=storage_session_id, options=kwargs.get("options", {}))
else:
raise NotImplementedError(f"can not be initialized with storage engine: {storage_engine}")
self._storage_session[storage_session_id] = storage_session
return storage_session
def get_table(self, name, namespace, ignore_disable=False) -> typing.Union[StorageTableABC, None]:
meta = Session.get_table_meta(name=name, namespace=namespace)
if meta is None:
return None
if meta.get_disable() and not ignore_disable:
raise Exception(f"table {namespace} {name} disable: {meta.get_disable()}")
engine = meta.get_engine()
storage_session = self._get_or_create_storage(storage_engine=engine)
table = storage_session.get_table(name=name, namespace=namespace)
return table
@classmethod
def get_table_meta(cls, name, namespace) -> typing.Union[StorageTableMetaABC, None]:
meta = StorageSessionBase.get_table_meta(name=name, namespace=namespace)
return meta
@classmethod
def persistent(cls, computing_table: CTableABC, namespace, name, schema=None, part_of_data=None,
engine=None, engine_address=None, store_type=None, token: typing.Dict = None) -> StorageTableMetaABC:
return StorageSessionBase.persistent(computing_table=computing_table,
namespace=namespace,
name=name,
schema=schema,
part_of_data=part_of_data,
engine=engine,
engine_address=engine_address,
store_type=store_type,
token=token)
@property
def computing(self) -> CSessionABC:
return self._computing_session
@property
def federation(self) -> FederationABC:
return self._federation_session
def storage(self, **kwargs):
return self._get_or_create_storage(**kwargs)
@property
def parties(self):
return self._parties_info
@property
def is_computing_valid(self):
return self._computing_session is not None
@property
def is_federation_valid(self):
return self._federation_session is not None
@DB.connection_context()
def save_record(self, engine_type, engine_name, engine_session_id, engine_runtime_conf=None):
self._logger.info(
f"try to save session record for manager {self._session_id}, {engine_type} {engine_name}"
f" {engine_session_id}")
session_record = SessionRecord()
session_record.f_manager_session_id = self._session_id
session_record.f_engine_type = engine_type
session_record.f_engine_name = engine_name
session_record.f_engine_session_id = engine_session_id
session_record.f_engine_address = engine_runtime_conf if engine_runtime_conf else {}
session_record.f_create_time = base_utils.current_timestamp()
msg = f"save storage session record for manager {self._session_id}, {engine_type} {engine_name} " \
f"{engine_session_id}"
try:
effect_count = session_record.save(force_insert=True)
if effect_count != 1:
raise RuntimeError(f"{msg} failed")
except peewee.IntegrityError as e:
LOGGER.warning(e)
except Exception as e:
raise RuntimeError(f"{msg} exception", e)
self._logger.info(
f"save session record for manager {self._session_id}, {engine_type} {engine_name} "
f"{engine_session_id} successfully")
@DB.connection_context()
def delete_session_record(self, engine_session_id, manager_session_id=None):
if not manager_session_id:
rows = SessionRecord.delete().where(SessionRecord.f_engine_session_id == engine_session_id).execute()
else:
rows = SessionRecord.delete().where(SessionRecord.f_engine_session_id == engine_session_id,
SessionRecord.f_manager_session_id == manager_session_id).execute()
if rows > 0:
self._logger.info(f"delete session {engine_session_id} record successfully")
else:
self._logger.warning(f"delete session {engine_session_id} record failed")
@classmethod
@DB.connection_context()
def query_sessions(cls, reverse=None, order_by=None, **kwargs):
try:
session_records = SessionRecord.query(reverse=reverse, order_by=order_by, **kwargs)
return session_records
except BaseException:
return []
@DB.connection_context()
def get_session_from_record(self, **kwargs):
self._logger.info(f"query by manager session id {self._session_id}")
session_records = self.query_sessions(manager_session_id=self.session_id, **kwargs)
self._logger.info([session_record.f_engine_session_id for session_record in session_records])
for session_record in session_records:
try:
engine_session_id = session_record.f_engine_session_id
if session_record.f_engine_type == EngineType.COMPUTING:
self._init_computing_if_not_valid(computing_session_id=engine_session_id)
elif session_record.f_engine_type == EngineType.STORAGE:
self._get_or_create_storage(storage_session_id=engine_session_id,
storage_engine=session_record.f_engine_name,
record=False)
elif session_record.f_engine_type == EngineType.FEDERATION:
self._logger.info(f"engine runtime conf: {session_record.f_engine_address}")
self._init_federation_if_not_valid(federation_session_id=engine_session_id,
engine_runtime_conf=session_record.f_engine_address)
except Exception as e:
self._logger.info(e)
self.delete_session_record(engine_session_id=session_record.f_engine_session_id)
def _init_computing_if_not_valid(self, computing_session_id):
if not self.is_computing_valid:
self.init_computing(computing_session_id=computing_session_id, record=False)
return True
elif self._computing_session.session_id != computing_session_id:
self._logger.warning(
f"manager session had computing session {self._computing_session.session_id} "
f"different with query from db session {computing_session_id}")
return False
else:
# already exists
return True
def _init_federation_if_not_valid(self, federation_session_id, engine_runtime_conf):
if not self.is_federation_valid:
try:
self._logger.info(f"init federation session {federation_session_id} type {self._federation_type}")
self.init_federation(federation_session_id=federation_session_id,
runtime_conf=engine_runtime_conf.get("runtime_conf"),
service_conf=engine_runtime_conf.get("service_conf"),
record=False)
self._logger.info(f"init federation session {federation_session_id} type {self._federation_type} done")
return True
except Exception as e:
self._logger.warning(
f"init federation session {federation_session_id} type {self._federation_type} failed: {e}")
return False
elif self._federation_session.session_id != federation_session_id:
self._logger.warning(
f"manager session had federation session {self._federation_session.session_id} different with query from db session {federation_session_id}")
return False
else:
# already exists
return True
def destroy_all_sessions(self, **kwargs):
self._logger.info(f"start destroy manager session {self._session_id} all sessions")
self.get_session_from_record(**kwargs)
self.destroy_federation_session()
self.destroy_storage_session()
self.destroy_computing_session()
self._logger.info(f"finish destroy manager session {self._session_id} all sessions")
def destroy_computing_session(self):
if self.is_computing_valid:
try:
self._logger.info(f"try to destroy computing session {self._computing_session.session_id}")
self._computing_session.destroy()
except Exception as e:
self._logger.info(f"destroy computing session {self._computing_session.session_id} failed", e)
self.delete_session_record(engine_session_id=self._computing_session.session_id)
self._computing_session = None
def destroy_storage_session(self):
for session_id, session in self._storage_session.items():
try:
self._logger.info(f"try to destroy storage session {session_id}")
session.destroy()
self._logger.info(f"destroy storage session {session_id} successfully")
except Exception as e:
self._logger.exception(f"destroy storage session {session_id} failed", e)
self.delete_session_record(engine_session_id=session_id)
self._storage_session = {}
def destroy_federation_session(self):
if self.is_federation_valid:
try:
if self._parties_info.local_party.role != "local":
self._logger.info(
f"try to destroy federation session {self._federation_session.session_id} type"
f" {EngineType.FEDERATION} role {self._parties_info.local_party.role}")
self._federation_session.destroy(parties=self._all_party_info)
self._logger.info(f"destroy federation session {self._federation_session.session_id} done")
except Exception as e:
self._logger.info(f"destroy federation failed: {e}")
self.delete_session_record(engine_session_id=self._federation_session.session_id,
manager_session_id=self.session_id)
self._federation_session = None
def wait_remote_all_done(self, timeout=None):
LOGGER.info(f"remote futures: {remote_status._remote_futures}, waiting...")
remote_status.wait_all_remote_done(timeout)
LOGGER.info(f"remote futures: {remote_status._remote_futures}, all done")
session_id: str
property
readonly
¶
computing: CSessionABC
property
readonly
¶
federation: FederationABC
property
readonly
¶
parties
property
readonly
¶
is_computing_valid
property
readonly
¶
is_federation_valid
property
readonly
¶
get_global()
classmethod
¶
Source code in fate_arch/session/_session.py
@classmethod
def get_global(cls):
return cls.__GLOBAL_SESSION
as_global(self)
¶
Source code in fate_arch/session/_session.py
def as_global(self):
self._as_global(self)
return self
__init__(self, session_id=None, options=None)
special
¶
Source code in fate_arch/session/_session.py
def __init__(self, session_id: str = None, options=None):
if options is None:
options = {}
engines = engine_utils.get_engines()
LOGGER.info(f"using engines: {engines}")
computing_type = engines.get(EngineType.COMPUTING, None)
if computing_type is None:
raise RuntimeError(f"must set default engines on conf/service_conf.yaml")
self._computing_type = engines.get(EngineType.COMPUTING, None)
self._federation_type = engines.get(EngineType.FEDERATION, None)
self._storage_engine = engines.get(EngineType.STORAGE, None)
self._computing_session: typing.Optional[CSessionABC] = None
self._federation_session: typing.Optional[FederationABC] = None
self._storage_session: typing.Dict[StorageSessionABC] = {}
self._parties_info: typing.Optional[PartiesInfo] = None
self._all_party_info: typing.List[Party] = []
self._session_id = str(uuid.uuid1()) if not session_id else session_id
self._logger = LOGGER if options.get("logger", None) is None else options.get("logger", None)
self._logger.info(f"create manager session {self._session_id}")
# init meta db
init_database_tables()
__enter__(self)
special
¶
Source code in fate_arch/session/_session.py
def __enter__(self):
return self._open()
__exit__(self, exc_type, exc_val, exc_tb)
special
¶
Source code in fate_arch/session/_session.py
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_tb:
self._logger.exception("", exc_info=(exc_type, exc_val, exc_tb))
return self._close()
init_computing(self, computing_session_id=None, record=True, **kwargs)
¶
Source code in fate_arch/session/_session.py
def init_computing(self,
computing_session_id: str = None,
record: bool = True,
**kwargs):
computing_session_id = f"{self._session_id}_computing_{uuid.uuid1()}" if not computing_session_id else computing_session_id
if self.is_computing_valid:
raise RuntimeError(f"computing session already valid")
if record:
self.save_record(engine_type=EngineType.COMPUTING,
engine_name=self._computing_type,
engine_session_id=computing_session_id)
if self._computing_type == ComputingEngine.STANDALONE:
from fate_arch.computing.standalone import CSession
options = kwargs.get("options", {})
self._computing_session = CSession(session_id=computing_session_id, options=options)
self._computing_type = ComputingEngine.STANDALONE
return self
if self._computing_type == ComputingEngine.EGGROLL:
from fate_arch.computing.eggroll import CSession
options = kwargs.get("options", {})
self._computing_session = CSession(
session_id=computing_session_id, options=options
)
return self
if self._computing_type == ComputingEngine.SPARK:
from fate_arch.computing.spark import CSession
self._computing_session = CSession(session_id=computing_session_id)
self._computing_type = ComputingEngine.SPARK
return self
if self._computing_type == ComputingEngine.LINKIS_SPARK:
from fate_arch.computing.spark import CSession
self._computing_session = CSession(session_id=computing_session_id)
self._computing_type = ComputingEngine.LINKIS_SPARK
return self
raise RuntimeError(f"{self._computing_type} not supported")
init_federation(self, federation_session_id, *, runtime_conf=None, parties_info=None, service_conf=None, record=True)
¶
Source code in fate_arch/session/_session.py
def init_federation(
self,
federation_session_id: str,
*,
runtime_conf: typing.Optional[dict] = None,
parties_info: typing.Optional[PartiesInfo] = None,
service_conf: typing.Optional[dict] = None,
record: bool = True,
):
if record:
self.save_record(engine_type=EngineType.FEDERATION,
engine_name=self._federation_type,
engine_session_id=federation_session_id,
engine_runtime_conf={"runtime_conf": runtime_conf, "service_conf": service_conf})
if parties_info is None:
if runtime_conf is None:
raise RuntimeError(f"`party_info` and `runtime_conf` are both `None`")
parties_info = PartiesInfo.from_conf(runtime_conf)
self._parties_info = parties_info
self._all_party_info = [Party(k, p) for k, v in runtime_conf['role'].items() for p in v]
if self.is_federation_valid:
raise RuntimeError("federation session already valid")
if self._federation_type == FederationEngine.STANDALONE:
from fate_arch.computing.standalone import CSession
from fate_arch.federation.standalone import Federation
if not self.is_computing_valid or not isinstance(
self._computing_session, CSession
):
raise RuntimeError(
f"require computing with type {ComputingEngine.STANDALONE} valid"
)
self._federation_session = Federation(
standalone_session=self._computing_session.get_standalone_session(),
federation_session_id=federation_session_id,
party=parties_info.local_party,
)
return self
if self._federation_type == FederationEngine.EGGROLL:
from fate_arch.computing.eggroll import CSession
from fate_arch.federation.eggroll import Federation
if not self.is_computing_valid or not isinstance(
self._computing_session, CSession
):
raise RuntimeError(
f"require computing with type {ComputingEngine.EGGROLL} valid"
)
self._federation_session = Federation(
rp_ctx=self._computing_session.get_rpc(),
rs_session_id=federation_session_id,
party=parties_info.local_party,
proxy_endpoint=f"{service_conf['host']}:{service_conf['port']}",
)
return self
if self._federation_type == FederationEngine.RABBITMQ:
from fate_arch.federation.rabbitmq import Federation
self._federation_session = Federation.from_conf(
federation_session_id=federation_session_id,
party=parties_info.local_party,
runtime_conf=runtime_conf,
rabbitmq_config=service_conf,
)
return self
# Add pulsar support
if self._federation_type == FederationEngine.PULSAR:
from fate_arch.federation.pulsar import Federation
self._federation_session = Federation.from_conf(
federation_session_id=federation_session_id,
party=parties_info.local_party,
runtime_conf=runtime_conf,
pulsar_config=service_conf,
)
return self
raise RuntimeError(f"{self._federation_type} not supported")
get_table(self, name, namespace, ignore_disable=False)
¶
Source code in fate_arch/session/_session.py
def get_table(self, name, namespace, ignore_disable=False) -> typing.Union[StorageTableABC, None]:
meta = Session.get_table_meta(name=name, namespace=namespace)
if meta is None:
return None
if meta.get_disable() and not ignore_disable:
raise Exception(f"table {namespace} {name} disable: {meta.get_disable()}")
engine = meta.get_engine()
storage_session = self._get_or_create_storage(storage_engine=engine)
table = storage_session.get_table(name=name, namespace=namespace)
return table
get_table_meta(name, namespace)
classmethod
¶
Source code in fate_arch/session/_session.py
@classmethod
def get_table_meta(cls, name, namespace) -> typing.Union[StorageTableMetaABC, None]:
meta = StorageSessionBase.get_table_meta(name=name, namespace=namespace)
return meta
persistent(computing_table, namespace, name, schema=None, part_of_data=None, engine=None, engine_address=None, store_type=None, token=None)
classmethod
¶
Source code in fate_arch/session/_session.py
@classmethod
def persistent(cls, computing_table: CTableABC, namespace, name, schema=None, part_of_data=None,
engine=None, engine_address=None, store_type=None, token: typing.Dict = None) -> StorageTableMetaABC:
return StorageSessionBase.persistent(computing_table=computing_table,
namespace=namespace,
name=name,
schema=schema,
part_of_data=part_of_data,
engine=engine,
engine_address=engine_address,
store_type=store_type,
token=token)
storage(self, **kwargs)
¶
Source code in fate_arch/session/_session.py
def storage(self, **kwargs):
return self._get_or_create_storage(**kwargs)
save_record(self, engine_type, engine_name, engine_session_id, engine_runtime_conf=None)
¶
Source code in fate_arch/session/_session.py
@DB.connection_context()
def save_record(self, engine_type, engine_name, engine_session_id, engine_runtime_conf=None):
self._logger.info(
f"try to save session record for manager {self._session_id}, {engine_type} {engine_name}"
f" {engine_session_id}")
session_record = SessionRecord()
session_record.f_manager_session_id = self._session_id
session_record.f_engine_type = engine_type
session_record.f_engine_name = engine_name
session_record.f_engine_session_id = engine_session_id
session_record.f_engine_address = engine_runtime_conf if engine_runtime_conf else {}
session_record.f_create_time = base_utils.current_timestamp()
msg = f"save storage session record for manager {self._session_id}, {engine_type} {engine_name} " \
f"{engine_session_id}"
try:
effect_count = session_record.save(force_insert=True)
if effect_count != 1:
raise RuntimeError(f"{msg} failed")
except peewee.IntegrityError as e:
LOGGER.warning(e)
except Exception as e:
raise RuntimeError(f"{msg} exception", e)
self._logger.info(
f"save session record for manager {self._session_id}, {engine_type} {engine_name} "
f"{engine_session_id} successfully")
delete_session_record(self, engine_session_id, manager_session_id=None)
¶
Source code in fate_arch/session/_session.py
@DB.connection_context()
def delete_session_record(self, engine_session_id, manager_session_id=None):
if not manager_session_id:
rows = SessionRecord.delete().where(SessionRecord.f_engine_session_id == engine_session_id).execute()
else:
rows = SessionRecord.delete().where(SessionRecord.f_engine_session_id == engine_session_id,
SessionRecord.f_manager_session_id == manager_session_id).execute()
if rows > 0:
self._logger.info(f"delete session {engine_session_id} record successfully")
else:
self._logger.warning(f"delete session {engine_session_id} record failed")
query_sessions(cls, reverse=None, order_by=None, **kwargs)
classmethod
¶
Source code in fate_arch/session/_session.py
@classmethod
@DB.connection_context()
def query_sessions(cls, reverse=None, order_by=None, **kwargs):
try:
session_records = SessionRecord.query(reverse=reverse, order_by=order_by, **kwargs)
return session_records
except BaseException:
return []
get_session_from_record(self, **kwargs)
¶
Source code in fate_arch/session/_session.py
@DB.connection_context()
def get_session_from_record(self, **kwargs):
self._logger.info(f"query by manager session id {self._session_id}")
session_records = self.query_sessions(manager_session_id=self.session_id, **kwargs)
self._logger.info([session_record.f_engine_session_id for session_record in session_records])
for session_record in session_records:
try:
engine_session_id = session_record.f_engine_session_id
if session_record.f_engine_type == EngineType.COMPUTING:
self._init_computing_if_not_valid(computing_session_id=engine_session_id)
elif session_record.f_engine_type == EngineType.STORAGE:
self._get_or_create_storage(storage_session_id=engine_session_id,
storage_engine=session_record.f_engine_name,
record=False)
elif session_record.f_engine_type == EngineType.FEDERATION:
self._logger.info(f"engine runtime conf: {session_record.f_engine_address}")
self._init_federation_if_not_valid(federation_session_id=engine_session_id,
engine_runtime_conf=session_record.f_engine_address)
except Exception as e:
self._logger.info(e)
self.delete_session_record(engine_session_id=session_record.f_engine_session_id)
destroy_all_sessions(self, **kwargs)
¶
Source code in fate_arch/session/_session.py
def destroy_all_sessions(self, **kwargs):
self._logger.info(f"start destroy manager session {self._session_id} all sessions")
self.get_session_from_record(**kwargs)
self.destroy_federation_session()
self.destroy_storage_session()
self.destroy_computing_session()
self._logger.info(f"finish destroy manager session {self._session_id} all sessions")
destroy_computing_session(self)
¶
Source code in fate_arch/session/_session.py
def destroy_computing_session(self):
if self.is_computing_valid:
try:
self._logger.info(f"try to destroy computing session {self._computing_session.session_id}")
self._computing_session.destroy()
except Exception as e:
self._logger.info(f"destroy computing session {self._computing_session.session_id} failed", e)
self.delete_session_record(engine_session_id=self._computing_session.session_id)
self._computing_session = None
destroy_storage_session(self)
¶
Source code in fate_arch/session/_session.py
def destroy_storage_session(self):
for session_id, session in self._storage_session.items():
try:
self._logger.info(f"try to destroy storage session {session_id}")
session.destroy()
self._logger.info(f"destroy storage session {session_id} successfully")
except Exception as e:
self._logger.exception(f"destroy storage session {session_id} failed", e)
self.delete_session_record(engine_session_id=session_id)
self._storage_session = {}
destroy_federation_session(self)
¶
Source code in fate_arch/session/_session.py
def destroy_federation_session(self):
if self.is_federation_valid:
try:
if self._parties_info.local_party.role != "local":
self._logger.info(
f"try to destroy federation session {self._federation_session.session_id} type"
f" {EngineType.FEDERATION} role {self._parties_info.local_party.role}")
self._federation_session.destroy(parties=self._all_party_info)
self._logger.info(f"destroy federation session {self._federation_session.session_id} done")
except Exception as e:
self._logger.info(f"destroy federation failed: {e}")
self.delete_session_record(engine_session_id=self._federation_session.session_id,
manager_session_id=self.session_id)
self._federation_session = None
wait_remote_all_done(self, timeout=None)
¶
Source code in fate_arch/session/_session.py
def wait_remote_all_done(self, timeout=None):
LOGGER.info(f"remote futures: {remote_status._remote_futures}, waiting...")
remote_status.wait_all_remote_done(timeout)
LOGGER.info(f"remote futures: {remote_status._remote_futures}, all done")
最后更新:
2021-11-15