module fundit::dataPuller /* * MySQL 连接,使用前应确保 loadPlugin("ODBC")已经被运行过 * * Create 20240711 使用ODBC连接MySQL数据库 Joey * */ def connect_mysql() { // 阿里云的mysql被魔改过,当前DolphinDB无法支持MySQL插件,只能用ODBC // loadPlugin("ODBC") // conn = odbc::connect("Driver={MySQL ODBC 9.0 UNICODE Driver};Server=funditdb-dev.mysql.rds.aliyuncs.com;Database=mfdb;User=pf_user;Password=MzBlMDA0OG", "MySQL") // 使用Windows的ODBC数据源事先设置号的连接 // conn = odbc::connect("Dsn=FunditDB-mfdb") conn = odbc::connect("Dsn=FunditDB-dev-mfdb") // t = odbc::query(conn, "SELECT * FROM pfdb.pf_portfolio_nav LIMIT 100") return conn } /* * 取本地数据库 * * get_local_database("fundit", "mfdb") */ def get_local_database(server_name, db_name) { db = database(directory="D:/Program Files/DolphinDB/server/database/" + server_name + "/" + db_name + "/") return db } /* * 读本地dolphindb数据表 * * load_table_from_local("fundit", mfdb.fund_performance") */ def load_table_from_local(server_name, table_name) { db = get_local_database(server_name, table_name.split(".")[0]) return loadTable(db, table_name.split(".")[1]) } /* * 存数据表到mySQL或本地dolphindb,原数据会被替代! * * save_table(tb_fund_performance, "mfdb.fund_performance", false) */ def save_table(tb, table_name, isToMySQL) { if(isToMySQL == true) { tb.addColumn("creatorid" "createtime" "updaterid" "updatetime" "isvalid", [INT, DATETIME, INT, DATETIME, INT]) UPDATE tb SET creatorid = 888888, createtime = now(), updaterid = null, updatetime = null, isvalid = 1 conn = connect_mysql() odbc::execute(conn, "TRUNCATE TABLE " + table_name + "_dolphin") odbc::append(conn, tb, table_name + "_dolphin", false) conn.close() } else { db = get_local_database("fundit", table_name.split(".")[0]) saveTable(db, tb, table_name.split(".")[1]) } } /* * 取指数周收益 * * get_index_weekly_rets("'FA00000WKG','FA00000WKH','IN0000007G'", 1990.01.01, today()) */ def get_index_weekly_rets(index_ids, start_date, end_date) { s_start_date = iif(start_date.isNull(), "", " AND price_date >= '" + start_date$STRING + "'") s_end_date = iif(end_date.isNull(), "", " AND price_date <= '" + end_date$STRING + "'") s_query = "SELECT factor_id AS index_id, year_week, price_date, factor_value AS cumulative_nav, ret_1w FROM pfdb.cm_factor_performance_weekly WHERE isvalid = 1 AND factor_id IN (" + index_ids + ")" + s_start_date + s_end_date + " AND ret_1w IS NOT NULL UNION SELECT fund_id AS index_id, year_week, price_date, cumulative_nav, ret_1w FROM mfdb.fund_performance_weekly WHERE isvalid = 1 AND fund_id IN (" + index_ids + ")" + s_start_date + s_end_date + " AND ret_1w IS NOT NULL ORDER BY year_week" conn = connect_mysql() t = odbc::query(conn, s_query) conn.close() return t } /* * 取基金周收益 * * * get_fund_weekly_rets("'MF00003TMH','MF00003UQM'", 1990.01.01, null, true) */ def get_fund_weekly_rets(fund_ids, start_date, end_date, isFromMySQL) { s_fund_id = iif(fund_ids.isNull(), "", " AND fund_id IN (" + fund_ids + ")") s_start_date = iif(start_date.isNull(), "", " AND price_date >= '" + start_date$STRING + "'") s_end_date = iif(end_date.isNull(), "", " AND price_date <= '" + end_date$STRING + "'") s_query = "SELECT fund_id, year_week, price_date, cumulative_nav, ret_1w FROM mfdb.fund_performance_weekly WHERE isvalid = 1 " + s_fund_id + s_start_date + s_end_date + " AND ret_1w IS NOT NULL ORDER BY fund_id, year_week" conn = connect_mysql() t = odbc::query(conn, s_query) conn.close() return t } /* * 取组合周收益 * TODO: 增加从本地取数据的功能 * * * get_portfolio_weekly_rets("166002,364640", 1990.01.01, today(), true) */ def get_portfolio_weekly_rets(portfolio_ids, start_date, end_date, isFromMySQL) { s_portfolio_id = iif(portfolio_ids.isNull(), "", " AND portfolio_id IN (" + portfolio_ids + ")") s_query = "SELECT portfolio_id, year_week, price_date, cumulative_nav, ret_1w FROM pfdb.pf_portfolio_performance_weekly WHERE isvalid = 1 " + s_portfolio_id + " AND ret_1w IS NOT NULL AND price_date BETWEEN '" + start_date$STRING + "' AND '" + end_date$STRING + "' ORDER BY portfolio_id, year_week" conn = connect_mysql() t = odbc::query(conn, s_query) conn.close() return t } /* * 取公私募基金月收益 * * get_fund_monthly_ret("'MF00003PW1','MF00003PW1'", 1990.01.01, today(), true) */ def get_fund_monthly_ret(fund_ids, start_date, end_date, isFromMySQL) { yyyymm_start = start_date.temporalFormat("yyyy-MM") yyyymm_end = end_date.temporalFormat("yyyy-MM") if(isFromMySQL == true) { ret_table_name = "mfdb.fund_performance" s_query = "SELECT fund_id, end_date, price_date, ret_1m AS ret, cumulative_nav AS nav, ret_ytd_a, ret_incep_a FROM " + ret_table_name + " WHERE fund_id IN (" + fund_ids + ") AND isvalid = 1 AND cumulative_nav > 0 AND end_date BETWEEN '" + yyyymm_start + "' AND '" + yyyymm_end + "' ORDER BY fund_id, end_date" conn = connect_mysql() t = odbc::query(conn, s_query) conn.close() } else { tb_local = load_table_from_local("fundit", "mfdb.fund_performance") s_col = (sqlCol("fund_id"), sqlCol("end_date"), sqlColAlias(, "ret"), sqlColAlias(, "nav"), sqlCol("ret_ytd_a"), sqlCol("ret_incep_a")) s_where = expr(, in, fund_ids.strReplace("'", "").split(",")) t = sql(s_col, tb_local, s_where).eval() } return t } /* * 取无风险月度利率 * * get_risk_free_rate("IN0000000M", 1990.01.01, today()) */ def get_risk_free_rate(index_id, start_date, end_date) { return get_fund_monthly_ret(index_id, start_date, end_date, true) } /* * 取基金最新收益及净值 * * get_fund_latest_nav_performance("'HF000004KN','HF00018WXG'") */ def get_fund_latest_nav_performance(fund_ids, isFromMySQL) { if(isFromMySQL == true) { s_query = "SELECT * FROM mfdb.fund_latest_nav_performance WHERE fund_id IN (" + fund_ids + ") AND isvalid = 1 ORDER BY fund_id" conn = connect_mysql() t = odbc::query(conn, s_query) conn.close() } else { tb_local = load_table_from_local("fundit", "mfdb.fund_latest_nav_performance") s_col = sqlCol("*") s_where = expr(, in, fund_ids.strReplace("'", "").split(",")) t = sql(s_col, tb_local, s_where).eval() } return t } /* * 取私募基金净值 * * * * Create: 202408 Joey * TODO: add isvalid and nav > 0 for local version * * * Example: get_hedge_fund_nav_by_price_date("'HF000004KN','HF00018WXG'", 2024.05.01, true) */ def get_hedge_fund_nav_by_price_date(fund_ids, price_date, isFromMySQL) { s_fund_ids = ''; // 判断输入的 fund_ids 是字符串标量还是向量 if ( fund_ids.form() == 0 ) { s_fund_ids = fund_ids; } else { s_fund_ids = "'" + fund_ids.concat("','") + "'"; } if(isFromMySQL == true) { nav_table_name = "mfdb.nav" s_query = "SELECT fund_id, price_date, cumulative_nav FROM " + nav_table_name + " WHERE fund_id IN (" + s_fund_ids + ") AND isvalid = 1 AND cumulative_nav > 0 AND price_date >= '" + price_date$STRING + "' ORDER BY fund_id, price_date" conn = connect_mysql() t = odbc::query(conn, s_query) conn.close() } else { tb_local = load_table_from_local("fundit", "mfdb.nav") s_col = sqlCol("*") s_where = [expr(, in, fund_ids.strReplace("'", "").split(",")), = price_date>] t = sql(s_col, tb_local, s_where).eval() } return t } /* * 取指数因子点位 * * get_index_nav_by_price_date("'IN00000008','FA00000WKG'", 2024.06.01) */ def get_index_nav_by_price_date(index_ids, price_date) { s_query = "SELECT index_id, price_date, close AS cumulative_nav FROM mfdb.market_indexes WHERE index_id IN (" + index_ids + ") AND isvalid = 1 AND close > 0 AND price_date >= '" + price_date + "' UNION SELECT index_id AS index_id, price_date, index_value AS cumulative_nav FROM mfdb.indexes_ty_index WHERE index_id IN (" + index_ids + ") AND isvalid = 1 AND index_value > 0 AND price_date >= '" + price_date + "' UNION SELECT factor_id AS index_id, price_date, factor_value AS cumulative_nav FROM pfdb.cm_factor_value WHERE factor_id IN (" + index_ids + ") AND isvalid = 1 AND factor_value > 0 AND price_date >= '" + price_date + "' ORDER BY price_date" conn = connect_mysql() t = odbc::query(conn, s_query) conn.close() return t } /* * 取有效基金基本信息 * * get_fund_info("'HF000004KN','HF00018WXG'") */ def get_fund_info(fund_ids) { s_query = "SELECT fi.fund_id, fi.inception_date, fi.primary_benchmark_id AS benchmark_id, IFNULL(fi.initial_unit_value, 1) AS ini_value, fs.strategy, fs.substrategy FROM mfdb.fund_information fi INNER JOIN mfdb.fund_strategy fs ON fi.fund_id = fs.fund_id AND fs.isvalid = 1 WHERE fi.fund_id IN (" + fund_ids + ") AND fi.isvalid = 1 ORDER BY fi.fund_id" conn = connect_mysql() t = odbc::query(conn, s_query) conn.close() return t } /* * 取私募基金净值更新信息, 返回基金及其净值更新的最早净值日期 * * @param fund_ids: fund_id STRING VECTOR * @param update_time: all updates after this time * * Example: get_fund_list_by_nav_updatetime(null, 2024.07.19T10:00:00) * */ def get_fund_list_by_nav_updatetime(fund_ids, updatetime) { s_fund_sql = ''; // 这里要用 isVoid, 因为 isNull对向量返回的是布尔向量 if (! isVoid(fund_ids)){ s_fund_ids = fund_ids.concat("','"); s_fund_sql = " AND fi.fund_id IN ('" + s_fund_ids + "')"; } s_query = "SELECT fi.fund_id, MIN(nav.price_date) AS price_date, fi.inception_date, fi.primary_benchmark_id AS benchmark_id, IFNULL(fi.initial_unit_value, 1) AS ini_value FROM mfdb.fund_information fi INNER JOIN mfdb.nav ON fi.fund_id = nav.fund_id WHERE fi.isvalid = 1" + s_fund_sql + " AND nav.cumulative_nav > 0 AND nav.updatetime >= '" + updatetime + "' GROUP BY fi.fund_id ORDER BY fi.fund_id" conn = connect_mysql() t = odbc::query(conn, s_query) conn.close() return t } /* * 存私募基金净值到本地dolphindb * * save_hedge_fund_nav_to_local(tb_nav) */ def save_hedge_fund_nav_to_local(tb_nav) { save_table(tb_nav, "mfdb.nav", false) } /* * 存私募基金净值到本地dolphindb * * get_portfolio_holding_history("166002,364640") */ def get_portfolio_holding_history(portfolio_ids) { s_query = "SELECT portfolio_id, holding_date, fund_id, amount, fund_share, ROUND(amount/fund_share, 6) as nav FROM pfdb.pf_portfolio_fund_history WHERE portfolio_id IN (" + portfolio_ids + ") AND isvalid = 1 ORDER BY portfolio_id, holding_date" conn = connect_mysql() t = odbc::query(conn, s_query) conn.close() return t } /* * 取私募基金用于月末 fund_performance 表更新的净值 * * @param fund_ids: 逗号分隔的ID字符串, 每个ID都有'' * @param month_end: 月末日期字符串 YYYY-MM * * */ def get_nav_for_hedge_fund_performance(fund_ids, month_end) { s_query = "CALL pfdb.sp_get_nav_for_fund_performance(" + fund_ids + ", '" + month_end + "', 1);" conn = connect_mysql() t = odbc::query(conn, s_query) conn.close() return t }