Session is the context to use computing, storage and federation resouraces.
At most situation, users should not concern how Session is create.
FATE-Flow is responsed to create and init Session when Task launched.
For those who want to use computing, storage and federation api outside FATE-Flow Task,
flowing is a short guide.
init Session
sess=Session()sess.as_global()# flowing is optional, call if needed# init computingsess.init_computing(...)# init federationsess.init_federation(...)# init federationsess.init_storage(...)
def__init__(self,session_id:str=None,options=None):ifoptionsisNone:options={}engines=engine_utils.get_engines()LOGGER.info(f"using engines: {engines}")computing_type=engines.get(EngineType.COMPUTING,None)ifcomputing_typeisNone:raiseRuntimeError(f"must set default engines on conf/service_conf.yaml")self._computing_type=engines.get(EngineType.COMPUTING,None)self._federation_type=engines.get(EngineType.FEDERATION,None)self._storage_engine=engines.get(EngineType.STORAGE,None)self._computing_session:typing.Optional[CSessionABC]=Noneself._federation_session:typing.Optional[FederationABC]=Noneself._storage_session:typing.Dict[StorageSessionABC]={}self._parties_info:typing.Optional[PartiesInfo]=Noneself._all_party_info:typing.List[Party]=[]self._session_id=str(uuid.uuid1())ifnotsession_idelsesession_idself._logger=LOGGERifoptions.get("logger",None)isNoneelseoptions.get("logger",None)self._logger.info(f"create manager session {self._session_id}")# init meta dbinit_database_tables()
definit_federation(self,federation_session_id:str,*,runtime_conf:typing.Optional[dict]=None,parties_info:typing.Optional[PartiesInfo]=None,service_conf:typing.Optional[dict]=None,record:bool=True,):ifrecord:self.save_record(engine_type=EngineType.FEDERATION,engine_name=self._federation_type,engine_session_id=federation_session_id,engine_runtime_conf={"runtime_conf":runtime_conf,"service_conf":service_conf})ifparties_infoisNone:ifruntime_confisNone:raiseRuntimeError(f"`party_info` and `runtime_conf` are both `None`")parties_info=PartiesInfo.from_conf(runtime_conf)self._parties_info=parties_infoself._all_party_info=[Party(k,p)fork,vinruntime_conf['role'].items()forpinv]ifself.is_federation_valid:raiseRuntimeError("federation session already valid")ifself._federation_type==FederationEngine.STANDALONE:fromfate_arch.computing.standaloneimportCSessionfromfate_arch.federation.standaloneimportFederationifnotself.is_computing_validornotisinstance(self._computing_session,CSession):raiseRuntimeError(f"require computing with type {ComputingEngine.STANDALONE} valid")self._federation_session=Federation(standalone_session=self._computing_session.get_standalone_session(),federation_session_id=federation_session_id,party=parties_info.local_party,)returnselfifself._federation_type==FederationEngine.EGGROLL:fromfate_arch.computing.eggrollimportCSessionfromfate_arch.federation.eggrollimportFederationifnotself.is_computing_validornotisinstance(self._computing_session,CSession):raiseRuntimeError(f"require computing with type {ComputingEngine.EGGROLL} valid")self._federation_session=Federation(rp_ctx=self._computing_session.get_rpc(),rs_session_id=federation_session_id,party=parties_info.local_party,proxy_endpoint=f"{service_conf['host']}:{service_conf['port']}",)returnselfifself._federation_type==FederationEngine.RABBITMQ:fromfate_arch.federation.rabbitmqimportFederationself._federation_session=Federation.from_conf(federation_session_id=federation_session_id,party=parties_info.local_party,runtime_conf=runtime_conf,rabbitmq_config=service_conf,)returnself# Add pulsar supportifself._federation_type==FederationEngine.PULSAR:fromfate_arch.federation.pulsarimportFederationself._federation_session=Federation.from_conf(federation_session_id=federation_session_id,party=parties_info.local_party,runtime_conf=runtime_conf,pulsar_config=service_conf,)returnselfraiseRuntimeError(f"{self._federation_type} not supported")
@DB.connection_context()defsave_record(self,engine_type,engine_name,engine_session_id,engine_runtime_conf=None):self._logger.info(f"try to save session record for manager {self._session_id}, {engine_type}{engine_name}"f" {engine_session_id}")session_record=SessionRecord()session_record.f_manager_session_id=self._session_idsession_record.f_engine_type=engine_typesession_record.f_engine_name=engine_namesession_record.f_engine_session_id=engine_session_idsession_record.f_engine_address=engine_runtime_confifengine_runtime_confelse{}session_record.f_create_time=base_utils.current_timestamp()msg=f"save storage session record for manager {self._session_id}, {engine_type}{engine_name} " \
f"{engine_session_id}"try:effect_count=session_record.save(force_insert=True)ifeffect_count!=1:raiseRuntimeError(f"{msg} failed")exceptpeewee.IntegrityErrorase:LOGGER.warning(e)exceptExceptionase:raiseRuntimeError(f"{msg} exception",e)self._logger.info(f"save session record for manager {self._session_id}, {engine_type}{engine_name} "f"{engine_session_id} successfully")
@DB.connection_context()defdelete_session_record(self,engine_session_id,manager_session_id=None):ifnotmanager_session_id:rows=SessionRecord.delete().where(SessionRecord.f_engine_session_id==engine_session_id).execute()else:rows=SessionRecord.delete().where(SessionRecord.f_engine_session_id==engine_session_id,SessionRecord.f_manager_session_id==manager_session_id).execute()ifrows>0:self._logger.info(f"delete session {engine_session_id} record successfully")else:self._logger.warning(f"delete session {engine_session_id} record failed")
@DB.connection_context()defget_session_from_record(self,**kwargs):self._logger.info(f"query by manager session id {self._session_id}")session_records=self.query_sessions(manager_session_id=self.session_id,**kwargs)self._logger.info([session_record.f_engine_session_idforsession_recordinsession_records])forsession_recordinsession_records:try:engine_session_id=session_record.f_engine_session_idifsession_record.f_engine_type==EngineType.COMPUTING:self._init_computing_if_not_valid(computing_session_id=engine_session_id)elifsession_record.f_engine_type==EngineType.STORAGE:self._get_or_create_storage(storage_session_id=engine_session_id,storage_engine=session_record.f_engine_name,record=False)elifsession_record.f_engine_type==EngineType.FEDERATION:self._logger.info(f"engine runtime conf: {session_record.f_engine_address}")self._init_federation_if_not_valid(federation_session_id=engine_session_id,engine_runtime_conf=session_record.f_engine_address)exceptExceptionase:self._logger.info(e)self.delete_session_record(engine_session_id=session_record.f_engine_session_id)