task_fundPerformance.dos 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. module fundit::task_fundPerformance
  2. use fundit::sqlUtilities;
  3. use fundit::operationDataPuller;
  4. use fundit::performanceDataPuller;
  5. use fundit::dataSaver;
  6. use fundit::returnCalculator;
  7. use fundit::indicatorCalculator;
  8. use fundit::rbsaCalculator;
  9. use fundit::bfiMatcher;
  10. use fundit::ms_dataPuller;
  11. /*
  12. * [定时任务]:最新净值触发的业绩指标计算
  13. *
  14. * @param entityType <STRING>: 'MF', 'HF'...
  15. * @param date <DATETIME>: 净值更新时间, 为空时缺省为当前时间-1天;为1990.01.01或更早日期时代表初始化,指标会被存入本地数据库
  16. *
  17. * NOTE: 与Java不同的是当月indicator计算每日触发,不必等到Month-end production
  18. *
  19. * Example: calFundPerformanceTask('MF', 2024.10.28);
  20. * calFundPerformanceTask('MI', 2024.10.28);
  21. */
  22. def calFundPerformanceTask(entityType, date) {
  23. rt = '';
  24. if(!(entityType IN ['MF', 'HF', 'MI', 'FI'])) return null;
  25. if(date.isNothing() || date.isNull())
  26. end_day = temporalAdd(now(), -1d);
  27. else
  28. end_day = date;
  29. // 取有最新净值变动的基金列表 (1s)
  30. tb_cal_funds = get_entity_list_by_nav_updatetime(entityType, NULL, end_day, true);
  31. if(tb_cal_funds.isVoid() || tb_cal_funds.size() == 0 ) return;
  32. // 按照 MySQL 建好各表
  33. tb_fund_performance = create_entity_performance();
  34. tb_fund_indicator = create_entity_indicator();
  35. tb_fund_risk_stats = create_entity_risk_stats();
  36. tb_fund_riskadjret_stats = create_entity_riskadjret_stats();
  37. tb_fund_style_stats = create_entity_style_stats();
  38. tb_fund_performance_weekly = create_entity_performance_weekly();
  39. tb_fund_latest_performance = create_entity_latest_performance();
  40. // 分批跑
  41. i = 0;
  42. batch_size = 1000;
  43. do {
  44. funds = tb_cal_funds[i : min(tb_cal_funds.size(), i+batch_size)];
  45. if(funds.isVoid() || funds.size() == 0) break;
  46. // 200ms
  47. fund_info = SELECT entity_id, price_date, inception_date, benchmark_id, ini_value
  48. FROM ej(funds, get_entity_info(entityType, funds.entity_id), 'entity_id');
  49. // 计算月收益 (12s)
  50. rets = mix_monthly_returns(entityType, fund_info);
  51. if(!rets.isVoid() && rets.size() > 0) {
  52. // 计算月度指标 (56s)
  53. rets.rename!('cumulative_nav', 'nav');
  54. indicators = cal_monthly_indicators(entityType, 'PBI', rets);
  55. // 仿照MySQL的表结构准备好记录 (1s)
  56. generate_entity_performance(fund_info, indicators, true, tb_fund_performance);
  57. generate_entity_indicator(fund_info, indicators, true, tb_fund_indicator);
  58. generate_entity_risk_stats(fund_info, indicators, true, tb_fund_risk_stats);
  59. generate_entity_riskadjret_stats(fund_info, indicators, true, tb_fund_riskadjret_stats);
  60. generate_entity_style_stats(fund_info, indicators, true, tb_fund_style_stats);
  61. }
  62. // 计算周收益 (8s)
  63. rets_w = cal_weekly_returns(entityType, fund_info);
  64. if(! rets_w.isVoid() && rets_w.size() > 0) {
  65. generate_entity_performance_weekly(fund_info, rets_w, true, tb_fund_performance_weekly);
  66. }
  67. // 计算最新收益 (69s)
  68. perf_latest = cal_latest_performance(entityType, fund_info, true);
  69. if(! perf_latest.isVoid() && perf_latest.size() > 0) {
  70. generate_entity_latest_performance(fund_info, perf_latest, true, tb_fund_latest_performance);
  71. }
  72. i += batch_size;
  73. // } while (i < batch_size);
  74. } while (i <= tb_cal_funds.size());
  75. if(! tb_fund_performance.isVoid() && tb_fund_performance.size() > 0) {
  76. // save data to MySQL (13s)
  77. try {
  78. chg_columns_for_mysql(tb_fund_performance, 'fund_id');
  79. save_and_sync(tb_fund_performance, 'raw_db.fund_performance', 'raw_db.fund_performance');
  80. chg_columns_for_mysql(tb_fund_indicator, 'fund_id');
  81. save_and_sync(tb_fund_indicator, 'raw_db.fund_indicator', 'raw_db.fund_indicator');
  82. chg_columns_for_mysql(tb_fund_risk_stats, 'fund_id');
  83. // mfdb.fund_performance 表中 maxdrawdown_6m & maxdrawdown_ytd 是虚拟列,这里用数据列顺序强行写入真实列 6m_maxdrawdown & ytd_maxdrawdown (DolphinDB 不允许字段名以数字开头)
  84. save_and_sync(tb_fund_risk_stats, 'raw_db.fund_risk_stats', 'raw_db.fund_risk_stats');
  85. chg_columns_for_mysql(tb_fund_riskadjret_stats, 'fund_id');
  86. save_and_sync(tb_fund_riskadjret_stats, 'raw_db.fund_riskadjret_stats', 'raw_db.fund_riskadjret_stats');
  87. chg_columns_for_mysql(tb_fund_style_stats, 'fund_id');
  88. save_and_sync(tb_fund_style_stats, 'raw_db.fund_style_stats', 'raw_db.fund_style_stats');
  89. save_and_sync(tb_fund_performance_weekly, 'raw_db.fund_performance_weekly', 'raw_db.fund_performance_weekly');
  90. save_and_sync(tb_fund_latest_performance, 'raw_db.fund_latest_performance', 'raw_db.fund_latest_nav_performance');
  91. // 数据初始化时将指标存入本地,做排名之用
  92. if(end_day <= 1990.01.01) {
  93. save_table(tb_fund_performance, 'raw_db.fund_performance', false);
  94. save_table(tb_fund_indicator, 'raw_db.fund_indicator', false);
  95. save_table(tb_fund_risk_stats, 'raw_db.fund_risk_stats', false);
  96. save_table(tb_fund_riskadjret_stats, 'raw_db.fund_riskadjret_stats', false);
  97. save_table(tb_fund_style_stats, 'raw_db.fund_style_stats', false);
  98. }
  99. } catch(ex) {
  100. //TODO: Log errors
  101. rt = ex;
  102. }
  103. }
  104. return rt;
  105. }
  106. /*
  107. * [定时任务] 计算BFI指标并存入数据库
  108. *
  109. * @param entityType <STRING>: 'MF', 'HF', 'PF'; 前两个是一样的
  110. * @param date <DATETIME>: BFI更新时间, 为空时缺省为当前时间的前1天;为1989.01.01或更早日期时代表初始化,指标会被存入本地数据库
  111. *
  112. *
  113. * Example: calEntityBfiIndicatorTask('MF', 2024.10.28);
  114. * calEntityBfiIndicatorTask('PF', 2024.10.28);
  115. */
  116. def calEntityBfiIndicatorTask(entityType, date) {
  117. // entityType = 'MF'
  118. // date = 2024.10.01
  119. rt = '';
  120. if(!(entityType IN ['MF', 'HF', 'PF'])) return null;
  121. very_old_day = 1900.01.01;
  122. if(date.isNothing() || date.isNull())
  123. end_day = temporalAdd(now(), -1d);
  124. else
  125. end_day = date;
  126. // 1989.01.01及以前的日期被认为从本地读数据
  127. isFromMySQL = iif(end_day <= 1989.01.01, false, true);
  128. // 取有最新bfi变动的基金列表 (1s)
  129. tb_cal_entities = get_entity_bfi_factors(entityType, NULL, very_old_day.month(), today().month(), end_day);
  130. if(tb_cal_entities.isVoid() || tb_cal_entities.size() == 0 ) return;
  131. v_uniq_entity_id = EXEC DISTINCT entity_id FROM tb_cal_entities;
  132. // 按照 MySQL 建好各表
  133. tb_bfi_indicator = create_entity_bfi_indicator(iif(entityType=='PF', true, false));
  134. // 分批跑
  135. i = 0;
  136. batch_size = 100;
  137. do {
  138. entities = SELECT * FROM tb_cal_entities WHERE entity_id IN v_uniq_entity_id[i : min(v_uniq_entity_id.size(), i+batch_size)];
  139. if(entities.isVoid() || entities.size() == 0) break;
  140. // 200ms
  141. entity_info = SELECT entity_id, end_date.temporalParse('yyyy-MM') AS end_date, inception_date, factor_id AS benchmark_id, ini_value
  142. FROM ej(entities, get_entity_info(entityType, entities.entity_id), 'entity_id');
  143. // 取月收益 (12s)
  144. rets = get_monthly_ret(entityType, entity_info.entity_id, very_old_day, entity_info.end_date.max().temporalFormat('yyyy-MM-dd').temporalParse('yyyy-MM-dd').monthEnd(), isFromMySQL);
  145. // 把 yyyy-MM 格式的 end_date 改成 dolphin 的 MONTH
  146. v_end_date = rets.end_date.temporalParse('yyyy-MM');
  147. rets.replaceColumn!('end_date', v_end_date);
  148. if(!rets.isVoid() && rets.size() > 0) {
  149. // 计算月度指标 (5s)
  150. indicators = cal_monthly_indicators(entityType, 'BFI', rets);
  151. // 仿照MySQL的表结构准备好记录 (1s)
  152. generate_entity_bfi_indicator(entity_info, indicators, true, tb_bfi_indicator);
  153. }
  154. i += batch_size;
  155. } while (i <= v_uniq_entity_id.size());
  156. if(! tb_bfi_indicator.isVoid() && tb_bfi_indicator.size() > 0) {
  157. // save data to MySQL
  158. try {
  159. t_desc = get_bfi_indicator_table_description(entityType);
  160. chg_columns_for_mysql(tb_bfi_indicator, t_desc.sec_id_col[0]);
  161. db_name = t_desc.table_name[0].split('.')[0];
  162. save_and_sync(tb_bfi_indicator, t_desc.table_name[0].strReplace(db_name, 'raw_db'), t_desc.table_name[0].strReplace(db_name, 'raw_db'));
  163. // 数据初始化时将指标存入本地,做排名之用
  164. if(end_day <= 1990.01.01) {
  165. save_table(tb_bfi_indicator, t_desc.table_name[0].strReplace(db_name, 'raw_db'), false);
  166. }
  167. } catch(ex) {
  168. //TODO: Log errors
  169. rt = ex;
  170. }
  171. }
  172. return rt;
  173. }
  174. /*
  175. * 根据收益更新日期计算 RBSA
  176. *
  177. * Example: CalFundRBSATask('MF', ['MF00003PW1'], 2024.10.14T10:00:00);
  178. */
  179. def CalFundRBSATask(entityType, entityIds, updateTime) {
  180. // entityType = 'MF'
  181. //entityIds = ['MF00003PW1']
  182. //updateTime = 2024.10.14T10:00:00
  183. tb_result = table(100:0,
  184. ["entity_id", "asset_type_id", "index_id", "effective_date", "level", "alternative_id", "weighting"],
  185. [iif(entityType=='PF', INT, STRING), STRING, STRING, STRING, INT, STRING, DOUBLE]);
  186. t = get_entity_list_by_weekly_return_updatetime(entityType, entityIds, updateTime, true);
  187. window = 48;
  188. step = 13;
  189. if(t.isVoid() || t.size() == 0) return;
  190. d_rbsa = get_rbsa_index();
  191. for(entity in t) {
  192. for(asset_type in d_rbsa.keys()) {
  193. // 起始日期是最早更新日期再向前推一个时间窗口
  194. res = cal_entity_RBSA(entityType, entity.entity_id, d_rbsa[asset_type], 'w',
  195. t.price_date.temporalAdd(-window, 'w')[0], today(), true, window, step);
  196. // 每日任务只负责更新最新的rbsa结果
  197. latest_date = (EXEC price_date.max() AS price_date FROM res)[0];
  198. tb_result.tableInsert(SELECT entity_id, asset_type, index_id, price_date, level, alternative_id, weights
  199. FROM res WHERE price_date = latest_date);
  200. }
  201. }
  202. save_and_sync(tb_result, 'raw_db.pf_fund_rbsa_breakdown', 'raw_db.pf_fund_rbsa_breakdown');
  203. }
  204. /*
  205. * 【临时】用于数据初始化:只计算收益
  206. *
  207. * @param entityType <STRING>: 'MF', 'HF'...
  208. * @param date <DATETIME>: 净值更新时间
  209. *
  210. */
  211. def ms_calFundReturns() {
  212. rt = '';
  213. very_old_date = 1990.01.01;
  214. // 取基金列表 (27s)
  215. tb_cal_funds = ms_get_fund_list_by_nav_updatetime(NULL, very_old_date);
  216. if(tb_cal_funds.isVoid() || tb_cal_funds.size() == 0 ) return;
  217. tb_fund_performance = create_entity_performance();
  218. tb_fund_indicator = create_entity_indicator();
  219. tb_fund_risk_stats = create_entity_risk_stats();
  220. tb_fund_riskadjret_stats = create_entity_riskadjret_stats();
  221. tb_fund_style_stats = create_entity_style_stats();
  222. tb_fund_performance_weekly = create_entity_performance_weekly();
  223. tb_fund_latest_performance = create_entity_latest_performance();
  224. // 分批跑
  225. i = 0;
  226. batch_size = 1000;
  227. do {
  228. funds = tb_cal_funds[i : min(tb_cal_funds.size(), i+batch_size)];
  229. if(funds.isVoid() || funds.size() == 0) break;
  230. // 200ms
  231. fund_info = SELECT entity_id, price_date, inception_date, benchmark_id, ini_value
  232. FROM ej(funds, ms_get_fund_info(funds.entity_id), 'entity_id', 'fund_id');
  233. // 计算月收益 (19s)
  234. tb_nav = ms_get_fund_monthly_nav(fund_info.entity_id);
  235. rets = cal_monthly_returns_by_nav(fund_info, tb_nav);
  236. if(!rets.isVoid() && rets.size() > 0) {
  237. // 计算月度指标 (67s)
  238. rets.rename!('cumulative_nav', 'nav');
  239. indicators = cal_monthly_indicators('MF', 'PBI', rets);
  240. // 仿照MySQL的表结构准备好记录 (1s)
  241. generate_entity_performance(fund_info, indicators, true, tb_fund_performance);
  242. generate_entity_indicator(fund_info, indicators, true, tb_fund_indicator);
  243. generate_entity_risk_stats(fund_info, indicators, true, tb_fund_risk_stats);
  244. generate_entity_riskadjret_stats(fund_info, indicators, true, tb_fund_riskadjret_stats);
  245. generate_entity_style_stats(fund_info, indicators, true, tb_fund_style_stats);
  246. }
  247. // 计算周收益 (49s)
  248. rets_w = cal_weekly_returns('MF', fund_info);
  249. if(! rets_w.isVoid() && rets_w.size() > 0) {
  250. generate_entity_performance_weekly(fund_info, rets_w, true, tb_fund_performance_weekly);
  251. }
  252. // 计算最新收益 (23s)
  253. perf_latest = cal_latest_performance('MF', fund_info, true);
  254. if(! perf_latest.isVoid() && perf_latest.size() > 0) {
  255. generate_entity_latest_performance(fund_info, perf_latest, true, tb_fund_latest_performance);
  256. }
  257. i += batch_size;
  258. // } while (i < batch_size);
  259. } while (i <= tb_cal_funds.size());
  260. if(! tb_fund_performance.isVoid() && tb_fund_performance.size() > 0) {
  261. // save data to MySQL (26m)
  262. try {
  263. chg_columns_for_mysql(tb_fund_performance, 'fund_id');
  264. save_and_sync(tb_fund_performance, 'raw_db.fund_performance', 'raw_db.fund_performance');
  265. chg_columns_for_mysql(tb_fund_indicator, 'fund_id');
  266. save_and_sync(tb_fund_indicator, 'raw_db.fund_indicator', 'raw_db.fund_indicator');
  267. chg_columns_for_mysql(tb_fund_risk_stats, 'fund_id');
  268. // mfdb.fund_performance 表中 maxdrawdown_6m & maxdrawdown_ytd 是虚拟列,这里用数据列顺序强行写入真实列 6m_maxdrawdown & ytd_maxdrawdown (DolphinDB 不允许字段名以数字开头)
  269. save_and_sync(tb_fund_risk_stats, 'raw_db.fund_risk_stats', 'raw_db.fund_risk_stats');
  270. chg_columns_for_mysql(tb_fund_riskadjret_stats, 'fund_id');
  271. save_and_sync(tb_fund_riskadjret_stats, 'raw_db.fund_riskadjret_stats', 'raw_db.fund_riskadjret_stats');
  272. chg_columns_for_mysql(tb_fund_style_stats, 'fund_id');
  273. save_and_sync(tb_fund_style_stats, 'raw_db.fund_style_stats', 'raw_db.fund_style_stats');
  274. save_and_sync(tb_fund_performance_weekly, 'raw_db.fund_performance_weekly', 'raw_db.fund_performance_weekly');
  275. save_and_sync(tb_fund_latest_performance, 'raw_db.fund_latest_performance', 'raw_db.fund_latest_nav_performance');
  276. } catch(ex) {
  277. //TODO: Log errors
  278. rt = ex;
  279. }
  280. }
  281. return rt;
  282. }
  283. /*
  284. * 实验性质的API
  285. *
  286. *
  287. */
  288. def calFundIndexCorrelation(entityType, date) {
  289. if(find(['HF', 'MF'], entityType) < 0) return null;
  290. // 取有最新净值变动的基金列表 (1s)
  291. tb_cal_funds = get_entity_list_by_nav_updatetime(entityType, NULL, date, true);
  292. if(tb_cal_funds.isVoid() || tb_cal_funds.size() == 0 ) return;
  293. // tb_fund_index_coe = create_entity_index_coe();
  294. // (7m)
  295. coe = cal_entity_index_coe(entityType, tb_cal_funds[0:1000]);
  296. return coe;
  297. }