123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533 |
- module fundit::task_portfolioPerformance
- use fundit::sqlUtilities;
- use fundit::operationDataPuller;
- use fundit::performanceDataPuller;
- use fundit::portfolioDataPuller;
- use fundit::dataSaver;
- use fundit::navCalculator;
- use fundit::returnCalculator;
- use fundit::indicatorCalculator;
- /*
- * 计算组合历史净值(不存数据库)
- *
- * @param portfolio_ids <STRING|VECTOR>: 组合IDS,为空时跑全集(但不建议,因为可能会很吃内存)
- * @param updatetime <DATETIME>: 持仓证券净值更新时间,忽略时跑全历史
- *
- * @return <TABLE>: portfolio_id, price_date, ret, nav
- *
- * Example:calPortfolioNAV([143109, 145041]);
- * calPortfolioNAV([143109, 145041], 2024.10.28);
- */
- def calPortfolioNAV(portfolio_ids, updatetime=1900.01.01) {
- // portfolio_ids=[364743, 364744];
- // updatetime=1900.01.01;
- port_info = get_portfolio_list_by_fund_nav_updatetime(portfolio_ids, updatetime, true);
-
- tb_nav = cal_portfolio_nav(port_info);
- return tb_nav;
- }
- /*
- * 计算组合历史收益和指标(不存数据库)
- *
- * @param navs <TABLE>: NEED COLUMNS portfolio_id, price_date, ret, nav
- *
- * @return <DICTIONARY>:
- *
- * Example:calPortfolioPerformance(calPortfolioNAV([143109, 145041]));
- */
- def calPortfolioPerformance(navs) {
- if(navs.isVoid() || navs.size() == 0) return;
- tb_navs = navs;
- tb_navs.rename!(['portfolio_id'], ['entity_id']);
- port_ids = EXEC DISTINCT entity_id from tb_navs;
- port_info = get_entity_info('PF', port_ids);
- // 这个函数会根据情况加入成立日当月的初始净值,比直接用navs表中可能带的ret更方便
- tb_navs.rename!('nav', 'cumulative_nav');
- tb_month_ret = cal_monthly_returns_by_nav(port_info, tb_navs);
- tb_month_ret.rename!('cumulative_nav', 'nav');
- indicators = cal_monthly_indicators('PF', 'PBI', tb_month_ret);
- return indicators;
- }
- /*
- * 计算组合历史收益和指标(不存数据库)
- *
- * @param navs <TABLE>: NEED COLUMNS entity_id, price_date, ret, nav
- *
- * @return <DICTIONARY>:
- *
- * Example:calEntityPerformance('PF', calPortfolioNAV([143109, 145041]));
- */
- def calEntityPerformance(entity_type, navs) {
- if(navs.isVoid() || navs.size() == 0) return;
- tb_navs = navs;
- //tb_navs.rename!(['portfolio_id'], ['entity_id']);
- entity_ids = EXEC DISTINCT entity_id from tb_navs;
- entity_info = get_entity_info(entity_type, entity_ids);
- // 这个函数会根据情况加入成立日当月的初始净值,比直接用navs表中可能带的ret更方便
- tb_navs.rename!('nav', 'cumulative_nav');
- tb_month_ret = cal_monthly_returns_by_nav(entity_info, tb_navs);
- tb_month_ret.rename!('cumulative_nav', 'nav');
- indicators = cal_monthly_indicators(entity_type, 'PBI', tb_month_ret);
- return indicators;
- }
- /*
- * 计算组合净值并存入数据库
- *
- * TODO: release 时改变同步目标表为正式表
- */
- def cal_and_save_portfolio_nav(cal_portfolio_info, is_save_local) {
- rt = '';
- // 准备类似MySQL结构的数据表
- tb_portfolio_nav = create_entity_nav(true);
- // 分批跑
- i = 0;
- batch_size = 1000;
- all_portfolio_id = EXEC DISTINCT portfolio_id FROM cal_portfolio_info;
- do { // 先把净值算出来存入数据库,落袋为安
- portfolio_info = SELECT * FROM cal_portfolio_info
- WHERE portfolio_id IN all_portfolio_id[i : min(all_portfolio_id.size(), i+batch_size)];
- if(portfolio_info.isVoid() || portfolio_info.size() == 0) break;
- // 30 sec per 1000 portfolios
- tb_ret = cal_portfolio_nav(portfolio_info);
- INSERT INTO tb_portfolio_nav SELECT entity_id, price_date, nav FROM tb_ret;
- i += batch_size;
- } while (i <= cal_portfolio_info.size());
- if(! tb_portfolio_nav.isVoid() && tb_portfolio_nav.size() > 0) {
- // save data to MySQL (12 sec)
- try {
- tb_portfolio_nav.rename!('entity_id', 'portfolio_id');
- save_and_sync(tb_portfolio_nav, 'raw_db.pf_portfolio_nav', 'raw_db.pf_portfolio_nav');
- // 数据初始化时将指标存入本地
- if(is_save_local == true) {
- save_table(tb_portfolio_nav, 'pfdb.pf_portfolio_nav', false);
- }
- } catch(ex) {
- //TODO: Log errors
- rt = ex;
- }
- }
- return rt;
- }
- /*
- * 计算组合标准指标并存入数据库
- *
- * TODO: release 时改变同步目标表为正式表
- */
- def cal_and_save_portfolio_indicators(cal_portfolio_info, is_save_local) {
- rt = '';
- // 准备类似MySQL结构的数据表
- tb_portfolio_performance = create_entity_performance(true);
- tb_portfolio_indicator = create_entity_indicator(true);
- tb_portfolio_risk_stats = create_entity_risk_stats(true);
- tb_portfolio_riskadjret_stats = create_entity_riskadjret_stats(true);
- tb_portfolio_style_stats = create_entity_style_stats(true);
- tb_portfolio_performance_weekly = create_entity_performance_weekly(true);
- tb_portfolio_latest_performance = create_entity_latest_performance(true);
- // 分批跑
- i = 0;
- batch_size = 1000;
- all_portfolio_id = EXEC DISTINCT portfolio_id FROM cal_portfolio_info;
- do {
- cal_port = SELECT * FROM cal_portfolio_info
- WHERE portfolio_id IN all_portfolio_id[i : min(all_portfolio_id.size(), i+batch_size)];
- if(cal_port.isVoid() || cal_port.size() == 0) break;
- // 取数据库月度净值及前值 5 sec
- s_json = (SELECT portfolio_id, 1900.01.01 AS price_date FROM cal_port GROUP BY portfolio_id).rename!('portfolio_id', 'sec_id').toStdJson();
- tb_monthly_nav = get_nav_for_return_calculation('PF', 'm', s_json);
- // 把 portfolio id 字段从字符串换回整型,不然后面Join table的时候会出错
- v_portfolio_id = tb_monthly_nav.sec_id$INT;
- tb_monthly_nav.replaceColumn!('sec_id', v_portfolio_id);
- tb_monthly_nav.dropColumns!('nav').rename!(['sec_id', 'cumulative_nav'], ['portfolio_id', 'nav']);
- // 计算各标准指标
- indicators = calPortfolioPerformance(tb_monthly_nav);
- // 仿照MySQL的表结构准备好记录 (1s)
- port_info = (SELECT portfolio_id, start_cal_date.min() AS price_date FROM cal_port GROUP BY portfolio_id).rename!('portfolio_id', 'entity_id');
-
- generate_entity_performance(port_info, indicators, true, tb_portfolio_performance);
- generate_entity_indicator(port_info, indicators, true, tb_portfolio_indicator);
- generate_entity_risk_stats(port_info, indicators, true, tb_portfolio_risk_stats);
- generate_entity_riskadjret_stats(port_info, indicators, true, tb_portfolio_riskadjret_stats);
- generate_entity_style_stats(port_info, indicators, true, tb_portfolio_style_stats);
- // 计算周收益 (49s)
- port_info = SELECT * FROM ej(port_info, get_entity_info('PF', all_portfolio_id[i : min(all_portfolio_id.size(), i+batch_size)]), 'entity_id')
- rets_w = cal_weekly_returns('PF', port_info);
- if(! rets_w.isVoid() && rets_w.size() > 0) {
- // 把 portfolio id 字段从字符串换回整型,不然后面Join table的时候会出错
- v_portfolio_id = rets_w.entity_id$INT;
- rets_w.replaceColumn!('entity_id', v_portfolio_id);
-
- generate_entity_performance_weekly(port_info, rets_w, true, tb_portfolio_performance_weekly);
- }
- // 计算最新收益 (23s)
- perf_latest = cal_latest_performance('PF', port_info, true);
- if(! perf_latest.isVoid() && perf_latest.size() > 0) {
- generate_entity_latest_performance(port_info, perf_latest, true, tb_portfolio_latest_performance);
- }
- i += batch_size;
- } while (i <= cal_portfolio_info.size());
- if(! tb_portfolio_performance.isVoid() && tb_portfolio_performance.size() > 0) {
- // save data to MySQL
- try {
- chg_columns_for_mysql(tb_portfolio_performance, 'portfolio_id');
- save_and_sync(tb_portfolio_performance, 'raw_db.pf_portfolio_performance', 'raw_db.pf_portfolio_performance');
- chg_columns_for_mysql(tb_portfolio_indicator, 'portfolio_id');
- save_and_sync(tb_portfolio_indicator, 'raw_db.pf_portfolio_indicator', 'raw_db.pf_portfolio_indicator');
- chg_columns_for_mysql(tb_portfolio_risk_stats, 'portfolio_id');
- save_and_sync(tb_portfolio_risk_stats, 'raw_db.pf_portfolio_risk_stats', 'raw_db.pf_portfolio_risk_stats');
- chg_columns_for_mysql(tb_portfolio_riskadjret_stats, 'portfolio_id');
- save_and_sync(tb_portfolio_riskadjret_stats, 'raw_db.pf_portfolio_riskadjret_stats', 'raw_db.pf_portfolio_riskadjret_stats');
- chg_columns_for_mysql(tb_portfolio_style_stats, 'portfolio_id');
- save_and_sync(tb_portfolio_style_stats, 'raw_db.pf_portfolio_style_stats', 'raw_db.pf_portfolio_style_stats');
- save_and_sync(tb_portfolio_performance_weekly, 'raw_db.pf_portfolio_performance_weekly', 'raw_db.pf_portfolio_performance_weekly');
- save_and_sync(tb_portfolio_latest_performance, 'raw_db.pf_portfolio_latest_performance', 'raw_db.pf_portfolio_latest_performance');
- // 数据初始化时将指标存入本地
- if(is_save_local == true) {
- save_table(tb_portfolio_performance, 'pfdb.pf_portfolio_performance', false);
- save_table(tb_portfolio_indicator, 'pfdb.pf_portfolio_indicator', false);
- save_table(tb_portfolio_risk_stats, 'pfdb.pf_portfolio_risk_stats', false);
- save_table(tb_portfolio_riskadjret_stats, 'pfdb.pf_portfolio_riskadjret_stats', false);
- save_table(tb_portfolio_style_stats, 'pfdb.pf_portfolio_style_stats', false);
- save_table(tb_portfolio_performance_weekly, 'pfdb.pf_portfolio_performance_weekly', false);
- save_table(tb_portfolio_latest_performance, 'pfdb.pf_portfolio_latest_performance', false);
- }
- } catch(ex) {
- //TODO: Log errors
- rt = ex;
- }
- }
- return rt;
- }
- /*
- * 通用计算标准指标并存入数据库
- *
- * TODO: release 时改变同步目标表为正式表
- */
- def cal_and_save_entity_indicators(entity_type, cal_entity_info, is_save_local) {
- // cal_entity_info = tb_cal_factors
- // entity_type = 'FA'
- rt = '';
- is_id_interger = iif(entity_type == 'PF', true, false);
-
- // 准备类似MySQL结构的数据表
- tb_entity_performance = create_entity_performance(is_id_interger);
- tb_entity_indicator = create_entity_indicator(is_id_interger);
- tb_entity_risk_stats = create_entity_risk_stats(is_id_interger);
- tb_entity_riskadjret_stats = create_entity_riskadjret_stats(is_id_interger);
- tb_entity_style_stats = create_entity_style_stats(is_id_interger);
- tb_entity_performance_weekly = create_entity_performance_weekly(is_id_interger);
- tb_entity_latest_performance = create_entity_latest_performance(is_id_interger);
- // 分批跑
- i = 0;
- batch_size = 1000;
- all_entity_id = EXEC DISTINCT entity_id FROM cal_entity_info;
- do {
- cal_entity = SELECT * FROM cal_entity_info
- WHERE entity_id IN all_entity_id[i : min(all_entity_id.size(), i+batch_size)];
- if(cal_entity.isVoid() || cal_entity.size() == 0) break;
- // 取数据库月度净值及前值 5 sec
- s_json = (SELECT entity_id, 1900.01.01 AS price_date FROM cal_entity GROUP BY entity_id).rename!('entity_id', 'sec_id').toStdJson();
- tb_monthly_nav = get_nav_for_return_calculation(entity_type, 'm', s_json);
- // 把组合 entity id 字段从字符串换回整型,不然后面Join table的时候会出错
- if(entity_type=='PF') {
- v_entity_id = tb_monthly_nav.sec_id$INT;
- tb_monthly_nav.replaceColumn!('sec_id', v_entity_id);
- }
-
- tb_monthly_nav.dropColumns!('nav').rename!(['sec_id', 'cumulative_nav'], ['entity_id', 'nav']);
- // 计算各标准指标
- indicators = calEntityPerformance(entity_type, tb_monthly_nav);
- // 仿照MySQL的表结构准备好记录 (1s)
- entity_info = (SELECT entity_id, start_cal_date.min() AS price_date FROM cal_entity GROUP BY entity_id);
- generate_entity_performance(entity_info, indicators, true, tb_entity_performance);
- generate_entity_indicator(entity_info, indicators, true, tb_entity_indicator);
- generate_entity_risk_stats(entity_info, indicators, true, tb_entity_risk_stats);
- generate_entity_riskadjret_stats(entity_info, indicators, true, tb_entity_riskadjret_stats);
- generate_entity_style_stats(entity_info, indicators, true, tb_entity_style_stats);
- // 计算周收益 (49s)
- entity_info = SELECT * FROM ej(entity_info, get_entity_info(entity_type, all_entity_id[i : min(all_entity_id.size(), i+batch_size)]), 'entity_id')
- rets_w = cal_weekly_returns(entity_type, entity_info);
- if(! rets_w.isVoid() && rets_w.size() > 0) {
- // 把 entity id 字段从字符串换回整型,不然后面Join table的时候会出错
- if(entity_type == 'PF') {
- v_entity_id = rets_w.entity_id$INT;
- rets_w.replaceColumn!('entity_id', v_entity_id);
- }
- generate_entity_performance_weekly(entity_info, rets_w, true, tb_entity_performance_weekly);
- }
- // 计算最新收益 (23s)
- perf_latest = cal_latest_performance(entity_type, entity_info, true);
- if(! perf_latest.isVoid() && perf_latest.size() > 0) {
- generate_entity_latest_performance(entity_info, perf_latest, true, tb_entity_latest_performance);
- }
- i += batch_size;
- } while (i <= cal_entity_info.size());
- if(! tb_entity_performance.isVoid() && tb_entity_performance.size() > 0) {
- // save data to MySQL
- try {
- des = get_performance_table_description(entity_type)[0];
- chg_columns_for_mysql(tb_entity_performance, des.sec_id_col);
- tb_entity_performance.rename!('cumulative_nav', des.cumulative_nav_col);
- save_and_sync(tb_entity_performance, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), );
- if(is_save_local == true) save_table(tb_entity_performance, des.table_name, false);
- des = get_indicator_table_description(entity_type)[0];
- chg_columns_for_mysql(tb_entity_indicator, des.sec_id_col);
- save_and_sync(tb_entity_indicator, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), );
- if(is_save_local == true) save_table(tb_entity_indicator, des.table_name, false);
- des = get_risk_stats_table_description(entity_type)[0];
- chg_columns_for_mysql(tb_entity_risk_stats, des.sec_id_col);
- save_and_sync(tb_entity_risk_stats, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), );
- if(is_save_local == true) save_table(tb_entity_risk_stats, des.table_name, false);
- des = get_riskadjret_stats_table_description(entity_type)[0];
- chg_columns_for_mysql(tb_entity_riskadjret_stats, des.sec_id_col);
- save_and_sync(tb_entity_riskadjret_stats, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), );
- if(is_save_local == true) save_table(tb_entity_riskadjret_stats, des.table_name, false);
- des = get_capture_style_table_description(entity_type)[0];
- chg_columns_for_mysql(tb_entity_style_stats, des.sec_id_col);
- save_and_sync(tb_entity_style_stats, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), );
- if(is_save_local == true) save_table(tb_entity_style_stats, des.table_name, false);
- des = get_performance_weekly_table_description(entity_type)[0];
- tb_entity_performance_weekly.rename!('cumulative_nav', des.cumulative_nav_col);
- save_and_sync(tb_entity_performance_weekly, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), );
- if(is_save_local == true) save_table(tb_entity_performance_weekly, des.table_name, false);
- des = get_latest_performance_table_description(entity_type)[0];
- tb_entity_latest_performance.rename!('cumulative_nav', des.cumulative_nav_col);
- save_and_sync(tb_entity_latest_performance, des.table_name.strReplace('pfdb', 'raw_db').strReplace('mfdb', 'raw_db'), );
- if(is_save_local == true) save_table(tb_entity_latest_performance, des.table_name, false);
- } catch(ex) {
- //TODO: Log errors
- rt = ex;
- }
- }
- return rt;
- }
- /*
- * [定时任务]批量计算组合净值、收益及指标
- *
- * @param updatetime <DATETIME>: 持仓证券净值更新时间,忽略或传入1989.01.01及更早的日期被认为在做数据初始化
- *
- *
- * Example: CalPortfolioPerformanceTask(2024.10.28);
- * CalPortfolioPerformanceTask(1989.01.01); -- 【初始化专用】 (45min)
- */
- def CalPortfolioPerformanceTask(updatetime) {
- rt = '';
- // 3 min
- tb_cal_ports = get_portfolio_list_by_fund_nav_updatetime(NULL, updatetime, true);
- if(tb_cal_ports.isVoid() || tb_cal_ports.size() == 0) return;
- is_save_local = iif(updatetime <= get_ini_data_const()['date'], true, false);
- // 26 min
- rt = cal_and_save_portfolio_nav(tb_cal_ports, is_save_local);
- // 9 min
- rt = rt + '; ' + cal_and_save_entity_indicators('PF', tb_cal_ports, is_save_local);
- return rt;
- }
- /*
- * 批量计算BFI因子净值
- *
- * Example: cal_and_save_factor_nav(get_bfi_factor_list_by_index_nav_updatetime(['FA00000VMJ'], updatetime, true);, false);
- *
- */
- def cal_and_save_factor_nav(cal_factor_info, is_save_local) {
- ret = ''
- t_factor_value = table(100:0, ['factor_id', 'price_date', 'factor_value'], [SYMBOL, DATE, DOUBLE]);
- // 因子个数有限,用循环更简便
- for(factor in cal_factor_info) {
- v_factor_id = array(STRING, 0).append!(factor.factor_id);
- // 取因子成分指数
- tb_holdings = get_fixed_weight_portfolio_holding('FA', v_factor_id);
- UPDATE tb_holdings SET first_cal_date = first_cal_date, latest_cal_date = latest_cal_date
- FROM ej(tb_holdings, cal_factor_info, 'entity_id', 'factor_id');
- s_json = (SELECT sec_id, first_cal_date.min() AS price_date FROM tb_holdings GROUP BY sec_id).toStdJson();
- // 取含前值的成分指数点位
- tb_nav = get_nav_for_return_calculation('MI', 'd', s_json).sortBy!(['sec_id', 'price_date'], [1, 1]);
- // 计算每期收益
- UPDATE tb_nav SET ret = cumulative_nav.ratios() - 1 CONTEXT BY sec_id;
- t = SELECT h.entity_id, n.price_date, h.sec_id, n.ret, h.weight/100 AS weight
- FROM tb_holdings AS h
- INNER JOIN tb_nav AS n ON h.sec_id = n.sec_id
- ORDER BY h.entity_id, h.sec_id, n.price_date;
- t_factor = SELECT factor_id AS entity_id, first_cal_date, latest_cal_date FROM cal_factor_info WHERE factor_id = factor.factor_id;
- t_tmp = cal_nav_by_return('FA', t_factor, t);
- if(!t_tmp.isVoid() && t_tmp.size() > 0) {
- INSERT INTO t_factor_value
- SELECT entity_id AS factor_id, price_date, nav AS factor_value FROM cal_nav_by_return('FA', t_factor, t);
- }
- }
- if(! t_factor_value.isVoid() && t_factor_value.size() > 0) {
- save_and_sync(t_factor_value, 'raw_db.cm_factor_value', 'raw_db.cm_factor_value');
- if(is_save_local == true) {
- save_table(t_factor_value, 'pfdb.cm_factor_value', false);
- }
- }
- }
- /*
- * [定时任务]批量计算bfi因子净值、收益及指标
- *
- * @param updatetime <DATETIME>: 成分指数净值更新时间,忽略或传入1989.01.01及更早的日期被认为在做数据初始化
- *
- *
- * Example: CalFactorPerformanceTask(2024.10.28);
- * CalFactorPerformanceTask(1989.01.01); -- 【初始化专用】 (1.3 min)
- */
- def CalFactorPerformanceTask(updatetime) {
- rt = '';
- // 根据成分指数净值更新日期,取有影响的因子
- tb_cal_factors = get_bfi_factor_list_by_index_nav_updatetime(NULL, updatetime, true);
- if(tb_cal_factors.isVoid() || tb_cal_factors.size() == 0) return rt;
- is_save_local = iif(updatetime <= get_ini_data_const()['date'], true, false);
- // 26 min
- rt = cal_and_save_factor_nav(tb_cal_factors, is_save_local);
- // 9 min
- tb_cal_factors.rename!(['factor_id', 'first_cal_date', 'latest_cal_date'], ['entity_id', 'start_cal_date', 'end_cal_date']);
- rt = rt + '; ' + cal_and_save_entity_indicators('FA', tb_cal_factors, is_save_local);
- return rt;
- }
|