module fundit::task_portfolioPerformance use fundit::sqlUtilities; use fundit::operationDataPuller; use fundit::performanceDataPuller; use fundit::portfolioDataPuller; use fundit::dataSaver; use fundit::navCalculator; use fundit::returnCalculator; use fundit::indicatorCalculator; /* * 计算组合历史净值(不存数据库) * * @param portfolio_ids : 组合IDS,为空时跑全集(但不建议,因为可能会很吃内存) * @param updatetime : 持仓证券净值更新时间,忽略时跑全历史 * * @return : portfolio_id, price_date, ret, nav * * Example:calPortfolioNAV([143109, 145041]); * calPortfolioNAV([143109, 145041], 2024.10.28); */ def calPortfolioNAV(portfolio_ids, updatetime=1900.01.01) { // portfolio_ids=[364743, 364744]; // updatetime=1900.01.01; port_info = get_portfolio_list_by_fund_nav_updatetime(portfolio_ids, updatetime, true); tb_nav = cal_portfolio_nav(port_info); return tb_nav; } /* * 计算组合历史收益和指标(不存数据库) * * @param navs
: NEED COLUMNS portfolio_id, price_date, ret, nav * * @return : * * Example:calPortfolioPerformance(calPortfolioNAV([143109, 145041])); */ def calPortfolioPerformance(navs) { if(navs.isVoid() || navs.size() == 0) return; tb_navs = navs; tb_navs.rename!(['portfolio_id'], ['entity_id']); port_ids = EXEC DISTINCT entity_id from tb_navs; port_info = get_entity_info('PF', port_ids); // 这个函数会根据情况加入成立日当月的初始净值,比直接用navs表中可能带的ret更方便 tb_navs.rename!('nav', 'cumulative_nav'); tb_month_ret = cal_monthly_returns_by_nav(port_info, tb_navs); tb_month_ret.rename!('cumulative_nav', 'nav'); indicators = cal_monthly_indicators('PF', 'PBI', tb_month_ret); return indicators; } /* * 计算组合历史收益和指标(不存数据库) * * @param navs
: NEED COLUMNS entity_id, price_date, ret, nav * * @return : * * Example:calEntityPerformance('PF', calPortfolioNAV([143109, 145041])); */ def calEntityPerformance(entity_type, navs) { if(navs.isVoid() || navs.size() == 0) return; tb_navs = navs; //tb_navs.rename!(['portfolio_id'], ['entity_id']); entity_ids = EXEC DISTINCT entity_id from tb_navs; entity_info = get_entity_info(entity_type, entity_ids); // 这个函数会根据情况加入成立日当月的初始净值,比直接用navs表中可能带的ret更方便 tb_navs.rename!('nav', 'cumulative_nav'); tb_month_ret = cal_monthly_returns_by_nav(entity_info, tb_navs); tb_month_ret.rename!('cumulative_nav', 'nav'); indicators = cal_monthly_indicators(entity_type, 'PBI', tb_month_ret); return indicators; } /* * 计算组合净值并存入数据库 * * TODO: release 时改变同步目标表为正式表 */ def cal_and_save_portfolio_nav(cal_portfolio_info, is_save_local) { rt = ''; // 准备类似MySQL结构的数据表 tb_portfolio_nav = create_entity_nav(true); // 分批跑 i = 0; batch_size = 1000; all_portfolio_id = EXEC DISTINCT portfolio_id FROM cal_portfolio_info; do { // 先把净值算出来存入数据库,落袋为安 portfolio_info = SELECT * FROM cal_portfolio_info WHERE portfolio_id IN all_portfolio_id[i : min(all_portfolio_id.size(), i+batch_size)]; if(portfolio_info.isVoid() || portfolio_info.size() == 0) break; // 30 sec per 1000 portfolios tb_ret = cal_portfolio_nav(portfolio_info); INSERT INTO tb_portfolio_nav SELECT entity_id, price_date, nav FROM tb_ret; i += batch_size; } while (i <= cal_portfolio_info.size()); if(! tb_portfolio_nav.isVoid() && tb_portfolio_nav.size() > 0) { // save data to MySQL (12 sec) try { tb_portfolio_nav.rename!('entity_id', 'portfolio_id'); save_and_sync(tb_portfolio_nav, 'raw_db.pf_portfolio_nav', 'raw_db.pf_portfolio_nav'); // 数据初始化时将指标存入本地 if(is_save_local == true) { save_table(tb_portfolio_nav, 'pfdb.pf_portfolio_nav', false); } } catch(ex) { //TODO: Log errors rt = ex; } } return rt; } /* * 计算组合标准指标并存入数据库 * * TODO: release 时改变同步目标表为正式表 */ def cal_and_save_portfolio_indicators(cal_portfolio_info, is_save_local) { rt = ''; // 准备类似MySQL结构的数据表 tb_portfolio_performance = create_entity_performance(true); tb_portfolio_indicator = create_entity_indicator(true); tb_portfolio_risk_stats = create_entity_risk_stats(true); tb_portfolio_riskadjret_stats = create_entity_riskadjret_stats(true); tb_portfolio_style_stats = create_entity_style_stats(true); tb_portfolio_performance_weekly = create_entity_performance_weekly(true); tb_portfolio_latest_performance = create_entity_latest_performance(true); // 分批跑 i = 0; batch_size = 1000; all_portfolio_id = EXEC DISTINCT portfolio_id FROM cal_portfolio_info; do { cal_port = SELECT * FROM cal_portfolio_info WHERE portfolio_id IN all_portfolio_id[i : min(all_portfolio_id.size(), i+batch_size)]; if(cal_port.isVoid() || cal_port.size() == 0) break; // 取数据库月度净值及前值 5 sec s_json = (SELECT portfolio_id, 1900.01.01 AS price_date FROM cal_port GROUP BY portfolio_id).rename!('portfolio_id', 'sec_id').toStdJson(); tb_monthly_nav = get_nav_for_return_calculation('PF', 'm', s_json); // 把 portfolio id 字段从字符串换回整型,不然后面Join table的时候会出错 v_portfolio_id = tb_monthly_nav.sec_id$INT; tb_monthly_nav.replaceColumn!('sec_id', v_portfolio_id); tb_monthly_nav.dropColumns!('nav').rename!(['sec_id', 'cumulative_nav'], ['portfolio_id', 'nav']); // 计算各标准指标 indicators = calPortfolioPerformance(tb_monthly_nav); // 仿照MySQL的表结构准备好记录 (1s) port_info = (SELECT portfolio_id, start_cal_date.min() AS price_date FROM cal_port GROUP BY portfolio_id).rename!('portfolio_id', 'entity_id'); generate_entity_performance(port_info, indicators, true, tb_portfolio_performance); generate_entity_indicator(port_info, indicators, true, tb_portfolio_indicator); generate_entity_risk_stats(port_info, indicators, true, tb_portfolio_risk_stats); generate_entity_riskadjret_stats(port_info, indicators, true, tb_portfolio_riskadjret_stats); generate_entity_style_stats(port_info, indicators, true, tb_portfolio_style_stats); // 计算周收益 (49s) port_info = SELECT * FROM ej(port_info, get_entity_info('PF', all_portfolio_id[i : min(all_portfolio_id.size(), i+batch_size)]), 'entity_id') rets_w = cal_weekly_returns('PF', port_info); if(! rets_w.isVoid() && rets_w.size() > 0) { // 把 portfolio id 字段从字符串换回整型,不然后面Join table的时候会出错 v_portfolio_id = rets_w.entity_id$INT; rets_w.replaceColumn!('entity_id', v_portfolio_id); generate_entity_performance_weekly(port_info, rets_w, true, tb_portfolio_performance_weekly); } // 计算最新收益 (23s) perf_latest = cal_latest_performance('PF', port_info, true); if(! perf_latest.isVoid() && perf_latest.size() > 0) { generate_entity_latest_performance(port_info, perf_latest, true, tb_portfolio_latest_performance); } i += batch_size; } while (i <= cal_portfolio_info.size()); if(! tb_portfolio_performance.isVoid() && tb_portfolio_performance.size() > 0) { // save data to MySQL try { chg_columns_for_mysql(tb_portfolio_performance, 'portfolio_id'); save_and_sync(tb_portfolio_performance, 'raw_db.pf_portfolio_performance', 'raw_db.pf_portfolio_performance'); chg_columns_for_mysql(tb_portfolio_indicator, 'portfolio_id'); save_and_sync(tb_portfolio_indicator, 'raw_db.pf_portfolio_indicator', 'raw_db.pf_portfolio_indicator'); chg_columns_for_mysql(tb_portfolio_risk_stats, 'portfolio_id'); save_and_sync(tb_portfolio_risk_stats, 'raw_db.pf_portfolio_risk_stats', 'raw_db.pf_portfolio_risk_stats'); chg_columns_for_mysql(tb_portfolio_riskadjret_stats, 'portfolio_id'); save_and_sync(tb_portfolio_riskadjret_stats, 'raw_db.pf_portfolio_riskadjret_stats', 'raw_db.pf_portfolio_riskadjret_stats'); chg_columns_for_mysql(tb_portfolio_style_stats, 'portfolio_id'); save_and_sync(tb_portfolio_style_stats, 'raw_db.pf_portfolio_style_stats', 'raw_db.pf_portfolio_style_stats'); save_and_sync(tb_portfolio_performance_weekly, 'raw_db.pf_portfolio_performance_weekly', 'raw_db.pf_portfolio_performance_weekly'); save_and_sync(tb_portfolio_latest_performance, 'raw_db.pf_portfolio_latest_performance', 'raw_db.pf_portfolio_latest_performance'); // 数据初始化时将指标存入本地 if(is_save_local == true) { save_table(tb_portfolio_performance, 'pfdb.pf_portfolio_performance', false); save_table(tb_portfolio_indicator, 'pfdb.pf_portfolio_indicator', false); save_table(tb_portfolio_risk_stats, 'pfdb.pf_portfolio_risk_stats', false); save_table(tb_portfolio_riskadjret_stats, 'pfdb.pf_portfolio_riskadjret_stats', false); save_table(tb_portfolio_style_stats, 'pfdb.pf_portfolio_style_stats', false); save_table(tb_portfolio_performance_weekly, 'pfdb.pf_portfolio_performance_weekly', false); save_table(tb_portfolio_latest_performance, 'pfdb.pf_portfolio_latest_performance', false); } } catch(ex) { //TODO: Log errors rt = ex; } } return rt; } /* * 通用计算标准指标并存入数据库 * * TODO: release 时改变同步目标表为正式表 */ def cal_and_save_entity_indicators(entity_type, cal_entity_info, is_save_local) { // cal_entity_info = tb_cal_factors // entity_type = 'FA' rt = ''; is_id_interger = iif(entity_type == 'PF', true, false); // 准备类似MySQL结构的数据表 tb_entity_performance = create_entity_performance(is_id_interger); tb_entity_indicator = create_entity_indicator(is_id_interger); tb_entity_risk_stats = create_entity_risk_stats(is_id_interger); tb_entity_riskadjret_stats = create_entity_riskadjret_stats(is_id_interger); tb_entity_style_stats = create_entity_style_stats(is_id_interger); tb_entity_performance_weekly = create_entity_performance_weekly(is_id_interger); tb_entity_latest_performance = create_entity_latest_performance(is_id_interger); // 分批跑 i = 0; batch_size = 1000; all_entity_id = EXEC DISTINCT entity_id FROM cal_entity_info; do { cal_entity = SELECT * FROM cal_entity_info WHERE entity_id IN all_entity_id[i : min(all_entity_id.size(), i+batch_size)]; if(cal_entity.isVoid() || cal_entity.size() == 0) break; // 取数据库月度净值及前值 5 sec s_json = (SELECT entity_id, 1900.01.01 AS price_date FROM cal_entity GROUP BY entity_id).rename!('entity_id', 'sec_id').toStdJson(); tb_monthly_nav = get_nav_for_return_calculation(entity_type, 'm', s_json); // 把组合 entity id 字段从字符串换回整型,不然后面Join table的时候会出错 if(entity_type=='PF') { v_entity_id = tb_monthly_nav.sec_id$INT; tb_monthly_nav.replaceColumn!('sec_id', v_entity_id); } tb_monthly_nav.dropColumns!('nav').rename!(['sec_id', 'cumulative_nav'], ['entity_id', 'nav']); // 计算各标准指标 indicators = calEntityPerformance(entity_type, tb_monthly_nav); // 仿照MySQL的表结构准备好记录 (1s) entity_info = (SELECT entity_id, start_cal_date.min() AS price_date FROM cal_entity GROUP BY entity_id); generate_entity_performance(entity_info, indicators, true, tb_entity_performance); generate_entity_indicator(entity_info, indicators, true, tb_entity_indicator); generate_entity_risk_stats(entity_info, indicators, true, tb_entity_risk_stats); generate_entity_riskadjret_stats(entity_info, indicators, true, tb_entity_riskadjret_stats); generate_entity_style_stats(entity_info, indicators, true, tb_entity_style_stats); // 计算周收益 (49s) entity_info = SELECT * FROM ej(entity_info, get_entity_info(entity_type, all_entity_id[i : min(all_entity_id.size(), i+batch_size)]), 'entity_id') rets_w = cal_weekly_returns(entity_type, entity_info); if(! rets_w.isVoid() && rets_w.size() > 0) { // 把 entity id 字段从字符串换回整型,不然后面Join table的时候会出错 if(entity_type == 'PF') { v_entity_id = rets_w.entity_id$INT; rets_w.replaceColumn!('entity_id', v_entity_id); } generate_entity_performance_weekly(entity_info, rets_w, true, tb_entity_performance_weekly); } // 计算最新收益 (23s) perf_latest = cal_latest_performance(entity_type, entity_info, true); if(! perf_latest.isVoid() && perf_latest.size() > 0) { generate_entity_latest_performance(entity_info, perf_latest, true, tb_entity_latest_performance); } i += batch_size; } while (i <= cal_entity_info.size()); if(! tb_entity_performance.isVoid() && tb_entity_performance.size() > 0) { // save data to MySQL try { des = get_performance_table_description(entity_type)[0]; chg_columns_for_mysql(tb_entity_performance, des.sec_id_col); tb_entity_performance.rename!('cumulative_nav', des.cumulative_nav_col); save_and_sync(tb_entity_performance, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), ); if(is_save_local == true) save_table(tb_entity_performance, des.table_name, false); des = get_indicator_table_description(entity_type)[0]; chg_columns_for_mysql(tb_entity_indicator, des.sec_id_col); save_and_sync(tb_entity_indicator, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), ); if(is_save_local == true) save_table(tb_entity_indicator, des.table_name, false); des = get_risk_stats_table_description(entity_type)[0]; chg_columns_for_mysql(tb_entity_risk_stats, des.sec_id_col); save_and_sync(tb_entity_risk_stats, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), ); if(is_save_local == true) save_table(tb_entity_risk_stats, des.table_name, false); des = get_riskadjret_stats_table_description(entity_type)[0]; chg_columns_for_mysql(tb_entity_riskadjret_stats, des.sec_id_col); save_and_sync(tb_entity_riskadjret_stats, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), ); if(is_save_local == true) save_table(tb_entity_riskadjret_stats, des.table_name, false); des = get_capture_style_table_description(entity_type)[0]; chg_columns_for_mysql(tb_entity_style_stats, des.sec_id_col); save_and_sync(tb_entity_style_stats, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), ); if(is_save_local == true) save_table(tb_entity_style_stats, des.table_name, false); des = get_performance_weekly_table_description(entity_type)[0]; tb_entity_performance_weekly.rename!('cumulative_nav', des.cumulative_nav_col); save_and_sync(tb_entity_performance_weekly, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), ); if(is_save_local == true) save_table(tb_entity_performance_weekly, des.table_name, false); des = get_latest_performance_table_description(entity_type)[0]; tb_entity_latest_performance.rename!('cumulative_nav', des.cumulative_nav_col); save_and_sync(tb_entity_latest_performance, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), ); if(is_save_local == true) save_table(tb_entity_latest_performance, des.table_name, false); } catch(ex) { //TODO: Log errors rt = ex; } } return rt; } /* * [定时任务]批量计算组合净值、收益及指标 * * @param updatetime : 持仓证券净值更新时间,忽略或传入1989.01.01及更早的日期被认为在做数据初始化 * * * Example: CalPortfolioPerformanceTask(2024.10.28); * CalPortfolioPerformanceTask(1989.01.01); -- 【初始化专用】 (45min) */ def CalPortfolioPerformanceTask(updatetime) { rt = ''; // 3 min tb_cal_ports = get_portfolio_list_by_fund_nav_updatetime(NULL, updatetime, true); if(tb_cal_ports.isVoid() || tb_cal_ports.size() == 0) return; is_save_local = iif(updatetime <= get_ini_data_const()['date'], true, false); // 26 min rt = cal_and_save_portfolio_nav(tb_cal_ports, is_save_local); // 9 min rt = rt + '; ' + cal_and_save_entity_indicators('PF', tb_cal_ports, is_save_local); return rt; } /* * 批量计算BFI因子净值 * * Example: cal_and_save_factor_nav(get_bfi_factor_list_by_index_nav_updatetime(['FA00000VMJ'], updatetime, true);, false); * */ def cal_and_save_factor_nav(cal_factor_info, is_save_local) { ret = '' t_factor_value = table(100:0, ['factor_id', 'price_date', 'factor_value'], [SYMBOL, DATE, DOUBLE]); // 因子个数有限,用循环更简便 for(factor in cal_factor_info) { v_factor_id = array(STRING, 0).append!(factor.factor_id); // 取因子成分指数 tb_holdings = get_fixed_weight_portfolio_holding('FA', v_factor_id); UPDATE tb_holdings SET first_cal_date = first_cal_date, latest_cal_date = latest_cal_date FROM ej(tb_holdings, cal_factor_info, 'entity_id', 'factor_id'); s_json = (SELECT sec_id, first_cal_date.min() AS price_date FROM tb_holdings GROUP BY sec_id).toStdJson(); // 取含前值的成分指数点位 tb_nav = get_nav_for_return_calculation('MI', 'd', s_json).sortBy!(['sec_id', 'price_date'], [1, 1]); // 计算每期收益 UPDATE tb_nav SET ret = cumulative_nav.ratios() - 1 CONTEXT BY sec_id; t = SELECT h.entity_id, n.price_date, h.sec_id, n.ret, h.weight/100 AS weight FROM tb_holdings AS h INNER JOIN tb_nav AS n ON h.sec_id = n.sec_id ORDER BY h.entity_id, h.sec_id, n.price_date; t_factor = SELECT factor_id AS entity_id, first_cal_date, latest_cal_date FROM cal_factor_info WHERE factor_id = factor.factor_id; t_tmp = cal_nav_by_return('FA', t_factor, t); if(!t_tmp.isVoid() && t_tmp.size() > 0) { INSERT INTO t_factor_value SELECT entity_id AS factor_id, price_date, nav AS factor_value FROM cal_nav_by_return('FA', t_factor, t); } } if(! t_factor_value.isVoid() && t_factor_value.size() > 0) { save_and_sync(t_factor_value, 'raw_db.cm_factor_value', 'raw_db.cm_factor_value'); if(is_save_local == true) { save_table(t_factor_value, 'pfdb.cm_factor_value', false); } } } /* * [定时任务]批量计算bfi因子净值、收益及指标 * * @param updatetime : 成分指数净值更新时间,忽略或传入1989.01.01及更早的日期被认为在做数据初始化 * * * Example: CalFactorPerformanceTask(2024.10.28); * CalFactorPerformanceTask(1989.01.01); -- 【初始化专用】 (1.3 min) */ def CalFactorPerformanceTask(updatetime) { rt = ''; // 根据成分指数净值更新日期,取有影响的因子 tb_cal_factors = get_bfi_factor_list_by_index_nav_updatetime(NULL, updatetime, true); if(tb_cal_factors.isVoid() || tb_cal_factors.size() == 0) return rt; is_save_local = iif(updatetime <= get_ini_data_const()['date'], true, false); // 26 min rt = cal_and_save_factor_nav(tb_cal_factors, is_save_local); // 9 min tb_cal_factors.rename!(['factor_id', 'first_cal_date', 'latest_cal_date'], ['entity_id', 'start_cal_date', 'end_cal_date']); rt = rt + '; ' + cal_and_save_entity_indicators('FA', tb_cal_factors, is_save_local); return rt; }