API文档
¶
app
¶
Author: Aaron-Yang [code@jieyu.ai] Contributors:
start(impl, cfg=None, **fetcher_params)
¶
启动一组Omega进程
使用本函数来启动一组Omega进程。这一组进程使用同样的quotes fetcher,但可能有1到多个session (限制由quotes fetcher给出)。它们共享同一个port。Sanic在内部实现了load-balance机制。
通过多次调用本方法,传入不同的quotes fetcher impl参数,即可启动多组Omega服务。
如果指定了fetcher_params
,则start
将使用impl, fetcher_params来启动单组Omega服务,使
用impl指定的fetcher。否则,将使用cfg.quotes_fetcher
中提供的信息来创建Omega.
如果cfg
不为None,则应该指定为合法的json string,其内容将覆盖本地cfg。这个设置目前的主要
要作用是方便单元测试。
Parameters:
Name | Type | Description | Default |
---|---|---|---|
impl |
str |
quotes fetcher implementor |
required |
cfg |
dict |
the cfg in json string |
None |
fetcher_params |
|
contains info required by creating quotes fetcher |
{} |
Source code in omega/app.py
def start(impl: str, cfg: dict = None, **fetcher_params):
"""启动一组Omega进程
使用本函数来启动一组Omega进程。这一组进程使用同样的quotes fetcher,但可能有1到多个session
(限制由quotes fetcher给出)。它们共享同一个port。Sanic在内部实现了load-balance机制。
通过多次调用本方法,传入不同的quotes fetcher impl参数,即可启动多组Omega服务。
如果指定了`fetcher_params`,则`start`将使用impl, fetcher_params来启动单组Omega服务,使
用impl指定的fetcher。否则,将使用`cfg.quotes_fetcher`中提供的信息来创建Omega.
如果`cfg`不为None,则应该指定为合法的json string,其内容将覆盖本地cfg。这个设置目前的主要
要作用是方便单元测试。
Args:
impl (str): quotes fetcher implementor
cfg: the cfg in json string
fetcher_params: contains info required by creating quotes fetcher
"""
sessions = fetcher_params.get("sessions", 1)
port = fetcher_params.get("port", 3181)
omega = Omega(impl, cfg, **fetcher_params)
app.register_listener(omega.init, "before_server_start")
logger.info("starting sanic group listen on %s with %s workers", port, sessions)
app.run(
host="0.0.0.0",
port=port,
workers=sessions,
register_sys_signals=True,
protocol=WebSocketProtocol,
)
cli
¶
管理应用程序生命期、全局对象、任务、全局消息响应
bin_cut(arr, n)
¶
将数组arr切分成n份
Parameters:
Name | Type | Description | Default |
---|---|---|---|
arr |
list |
[description] |
required |
n |
int |
[description] |
required |
Returns:
Type | Description |
---|---|
[type] |
[description] |
Source code in omega/cli.py
def bin_cut(arr: list, n: int):
"""将数组arr切分成n份
Args:
arr ([type]): [description]
n ([type]): [description]
Returns:
[type]: [description]
"""
result = [[] for i in range(n)]
for i, e in enumerate(arr):
result[i % n].append(e)
return [e for e in result if len(e)]
config_fetcher(settings)
¶
配置jq_fetcher
为Omega安装jqdatasdk, zillionare-omega-adaptors-jq, 配置jqdata访问账号
Source code in omega/cli.py
def config_fetcher(settings):
"""配置jq_fetcher
为Omega安装jqdatasdk, zillionare-omega-adaptors-jq, 配置jqdata访问账号
"""
msg = """
Omega需要配置上游行情服务器。当前支持的上游服务器有:\\n
[1] 聚宽`<joinquant>`\\n
"""
print(format_msg(msg))
more_account = True
workers = []
port = 3181
while more_account:
account = get_input("请输入账号:", None, os.environ.get("JQ_ACCOUNT") or "")
password = get_input("请输入密码:", None, os.environ.get("JQ_PASSWORD") or "")
sessions = get_input("请输入并发会话数", None, 1, "默认值[1]")
workers.append(
{
"account": account,
"password": password,
"sessions": sessions,
"port": port,
}
)
port += 1
more_account = input("继续配置新的账号[y|N]?\n").upper() == "Y"
settings["quotes_fetchers"] = []
append_fetcher(settings, {"impl": "jqadaptor", "workers": workers})
config_postgres(settings)
async
¶
配置数据连接并进行测试
Source code in omega/cli.py
async def config_postgres(settings):
"""配置数据连接并进行测试"""
msg = """
配置数据库并非必须。如果您仅限于在某些场景下使用Zillionare-omega,也可以不配置
数据库更多信息,\\n请参阅https://readthedocs.org/projects/zillionare-omega/
\\n跳过此项[S], 任意键继续:
"""
choice = input(format_msg(msg))
if choice.upper() == "S":
return
action = "R"
while action == "R":
host = get_input(
"请输入服务器地址,", None, os.environ.get("POSTGRES_HOST") or "localhost"
)
port = get_input(
"请输入服务器端口,", is_valid_port, os.environ.get("POSTGRES_PORT") or 5432
)
account = get_input("请输入账号,", None, os.environ.get("POSTGRES_USER"))
password = get_input("请输入密码,", None, os.environ.get("POSTGRES_PASSWORD"))
dbname = get_input(
"请输入数据库名,", None, os.environ.get("POSTGRES_DB") or "zillionare"
)
print("正在测试Postgres连接...")
dsn = f"postgres://{account}:{password}@{host}:{port}/{dbname}"
if await check_postgres(dsn):
update_config(settings, "postgres.dsn", dsn)
update_config(settings, "postgres.enabled", True)
print(f"[{colored('PASS', 'green')}] 数据库连接成功,并成功初始化!")
return True
else:
hint = f"[{colored('FAIL', 'red')}] 忽略错误[C],重新输入[R],退出[Q]"
action = choose_action(hint)
find_fetcher_processes()
¶
查找所有的omega(fetcher)进程
Omega进程在ps -aux中显示应该包含 omega.app --impl=<fetcher> --port=<port>信息
Source code in omega/cli.py
def find_fetcher_processes():
"""查找所有的omega(fetcher)进程
Omega进程在ps -aux中显示应该包含 omega.app --impl=<fetcher> --port=<port>信息
"""
result = {}
for p in psutil.process_iter():
cmd = " ".join(p.cmdline())
if "omega.app start" in cmd and "--impl" in cmd and "--port" in cmd:
m = re.search(r"--impl=([^\s]+)", cmd)
impl = m.group(1) if m else ""
m = re.search(r"--port=(\d+)", cmd)
port = m.group(1) if m else ""
group = f"{impl}:{port}"
pids = result.get(group, [])
pids.append(p.pid)
result[group] = pids
return result
format_msg(msg)
¶
格式化msg并显示在控制台上
本函数允许在写代码时按格式要求进行缩进和排版,但在输出时,这些格式都会被移除;对较长的文本, 按每80个字符为一行进行输出。
如果需要在msg中插入换行或者制表符,使用\n
和\t
。
Parameters:
Name | Type | Description | Default |
---|---|---|---|
msg |
str |
required |
Source code in omega/cli.py
def format_msg(msg: str):
"""格式化msg并显示在控制台上
本函数允许在写代码时按格式要求进行缩进和排版,但在输出时,这些格式都会被移除;对较长的文本,
按每80个字符为一行进行输出。
如果需要在msg中插入换行或者制表符,使用`\\n`和`\\t`。
args:
msg:
returns:
"""
msg = re.sub(r"\n\s+", "", msg)
msg = re.sub(r"[\t\n]", "", msg)
msg = msg.replace("\\t", "\t").replace("\\n", "\n")
lines = msg.split("\n")
msg = []
for line in lines:
for i in range(int(len(line) / 80 + 1)):
msg.append(line[i * 80 : min(len(line), (i + 1) * 80)])
return "\n".join(msg)
setup(reset_factory=False, force=False)
async
¶
安装初始化入口
Parameters:
Name | Type | Description | Default |
---|---|---|---|
reset_factory |
|
reset to factory settings |
False |
force |
|
if true, force setup no matter if run already |
False |
Source code in omega/cli.py
async def setup(reset_factory=False, force=False):
"""安装初始化入口
Args:
reset_factory: reset to factory settings
force: if true, force setup no matter if run already
Returns:
"""
msg = """
Zillionare-omega (大富翁)\\n
-------------------------\\n
感谢使用Zillionare-omega -- 高速分布式行情服务器!\\n
"""
print(format_msg(msg))
if not force:
config_file = os.path.join(get_config_dir(), "defaults.yaml")
if os.path.exists(config_file):
print(f"{colored('[PASS]', 'green')} 安装程序已在本机上成功运行")
sys.exit(0)
if reset_factory:
import sh
dst = get_config_dir()
os.makedirs(dst, exist_ok=True)
src = os.path.join(factory_config_dir(), "defaults.yaml")
dst = os.path.join(get_config_dir(), "defaults.yaml")
sh.cp("-r", src, dst)
print_title("Step 1. 检测安装环境...")
settings = load_factory_settings()
if not check_environment():
sys.exit(-1)
print_title("Step 2. 配置日志")
config_logging(settings)
print_title("Step 3. 配置上游服务器")
config_fetcher(settings)
print_title("Step 4. 配置Redis服务器")
await config_redis(settings)
print_title("Step 5. 配置Postgres服务器")
await config_postgres(settings)
save_config(settings)
print_title("Step 6. 下载历史数据")
config_dir = get_config_dir()
cfg4py.init(config_dir, False)
remove_console_log_handler()
await start("fetcher")
await download_archive(None)
print_title("配置已完成。现在为您启动Omega,开启财富之旅!")
await start("jobs")
await status()
start(service='')
async
¶
启动omega主进程或者任务管理进程
Parameters:
Name | Type | Description | Default |
---|---|---|---|
service |
str |
if service is '', then starts fetcher processes. |
'' |
Source code in omega/cli.py
async def start(service: str = ""):
"""启动omega主进程或者任务管理进程
Args:
service: if service is '', then starts fetcher processes.
Returns:
"""
print(f"正在启动zillionare-omega {colored(service, 'green')}...")
config_dir = get_config_dir()
cfg4py.init(config_dir, False)
if service == "":
await _start_jobs()
await _start_fetcher_processes()
elif service == "jobs":
return await _start_jobs()
elif service == "fetcher":
return await _start_fetcher_processes()
else:
print("不支持的服务")
sync_bars(frame=None, codes=None)
async
¶
立即同步行情数据
如果frame
, codes
没有提供,则从配置文件中读取相关信息
Parameters:
Name | Type | Description | Default |
---|---|---|---|
frame |
str |
None |
|
codes |
str |
None |
Source code in omega/cli.py
async def sync_bars(frame: str = None, codes: str = None):
"""立即同步行情数据
如果`frame`, `codes`没有提供,则从配置文件中读取相关信息
Args:
frame:
codes:
Returns:
"""
await _init()
if frame:
frame_type = FrameType(frame)
params = syncjobs.load_sync_params(frame_type)
if codes:
params["cat"] = None
params["include"] = codes
await syncjobs.trigger_bars_sync(params, force=True)
logger.info("request %s,%s send to workers.", params, codes)
else:
for frame_type in itertools.chain(tf.day_level_frames, tf.minute_level_frames):
params = syncjobs.load_sync_params(frame_type)
if not params:
continue
if codes:
params["cat"] = None
params["include"] = codes
await syncjobs.trigger_bars_sync(params, force=True)
logger.info("request %s,%s send to workers.", params, codes)
sync_calendar()
async
¶
发起同步交易日历请求
Source code in omega/cli.py
async def sync_calendar():
"""发起同步交易日历请求"""
await _init()
await syncjobs.trigger_single_worker_sync("calendar")
sync_sec_list()
async
¶
发起同步证券列表请求
Source code in omega/cli.py
async def sync_sec_list():
"""发起同步证券列表请求"""
await _init()
await syncjobs.trigger_single_worker_sync("security_list")
config
special
¶
Author: Aaron-Yang [code@jieyu.ai] Contributors:
core
special
¶
Author: Aaron-Yang [code@jieyu.ai] Contributors:
accelerate
¶
Author: Aaron-Yang [code@jieyu.ai] Contributors:
things need speed
merge(left, right, by)
¶
merge two numpy structured arrays by by
key
njit fail if one of left, right contains object, not plain type, but the loop is very fast, cost 0.0001 seconds
Parameters:
Name | Type | Description | Default |
---|---|---|---|
left |
[type] |
[description] |
required |
right |
[type] |
[description] |
required |
by |
[type] |
[description] |
required |
Returns:
Type | Description |
---|---|
[type] |
[description] |
Source code in omega/core/accelerate.py
def merge(left, right, by):
"""merge two numpy structured arrays by `by` key
njit fail if one of left, right contains object, not plain type, but the loop is
very fast, cost 0.0001 seconds
Args:
left ([type]): [description]
right ([type]): [description]
by ([type]): [description]
Returns:
[type]: [description]
"""
i, j = 0, 0
while j < len(right) and i < len(left):
if right[j][by] < left[by][i]:
j += 1
elif right[j][by] == left[by][i]:
left[i] = right[j]
i += 1
j += 1
else:
i += 1
return left
sanity
¶
Author: Aaron-Yang [code@jieyu.ai] Contributors:
calc_checksums(day, codes)
async
¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
day |
date |
required | |
codes |
List |
required |
Returns:
Type | Description |
---|---|
dict |
返回值为以code为键,该证券对应的{周期:checksum}的集合为值的集合 |
Source code in omega/core/sanity.py
async def calc_checksums(day: datetime.date, codes: List) -> dict:
"""
Args:
day:
codes:
Returns:
返回值为以code为键,该证券对应的{周期:checksum}的集合为值的集合
"""
end_time = arrow.get(day, tzinfo=cfg.tz).replace(hour=15)
checksums = {}
for i, code in enumerate(codes):
try:
checksum = {}
d = await cache.get_bars_raw_data(code, day, 1, FrameType.DAY)
if d:
checksum[f"{FrameType.DAY.value}"] = xxhash.xxh32_hexdigest(d)
d = await cache.get_bars_raw_data(code, end_time, 240, FrameType.MIN1)
if d:
checksum[f"{FrameType.MIN1.value}"] = xxhash.xxh32_hexdigest(d)
d = await cache.get_bars_raw_data(code, end_time, 48, FrameType.MIN5)
if d:
checksum[f"{FrameType.MIN5.value}"] = xxhash.xxh32_hexdigest(d)
d = await cache.get_bars_raw_data(code, end_time, 16, FrameType.MIN15)
if d:
checksum[f"{FrameType.MIN15.value}"] = xxhash.xxh32_hexdigest(d)
d = await cache.get_bars_raw_data(code, end_time, 8, FrameType.MIN30)
if d:
checksum[f"{FrameType.MIN30.value}"] = xxhash.xxh32_hexdigest(d)
d = await cache.get_bars_raw_data(code, end_time, 4, FrameType.MIN60)
if d:
checksum[f"{FrameType.MIN60.value}"] = xxhash.xxh32_hexdigest(d)
checksums[code] = checksum
except Exception as e:
logger.exception(e)
if (i + 1) % 500 == 0:
logger.info("calc checksum progress: %s/%s", i + 1, len(codes))
return checksums
do_validation(secs=None, start=None, end=None)
async
¶
对列表secs中指定的证券行情数据按start到end指定的时间范围进行校验
Parameters:
Name | Type | Description | Default |
---|---|---|---|
secs |
List[str] |
[description]. Defaults to None. |
None |
start |
str |
[description]. Defaults to None. |
None |
end |
str |
[description]. Defaults to None. |
None |
Returns:
Type | Description |
---|---|
[type] |
[description] |
Source code in omega/core/sanity.py
async def do_validation(secs: List[str] = None, start: str = None, end: str = None):
"""对列表secs中指定的证券行情数据按start到end指定的时间范围进行校验
Args:
secs (List[str], optional): [description]. Defaults to None.
start (str, optional): [description]. Defaults to None.
end (str, optional): [description]. Defaults to None.
Returns:
[type]: [description]
"""
logger.info("start validation...")
report = logging.getLogger("validation_report")
cfg = cfg4py.init(get_config_dir(), False)
await emit.start(engine=emit.Engine.REDIS, dsn=cfg.redis.dsn, start_server=True)
await omicron.init()
start = int(start or await cache.sys.get("jobs.bars_validation.range.start"))
if end is None:
end = tf.date2int(arrow.now().date())
else:
end = int(end or await cache.sys.get("jobs.bars_validation.range.stop"))
if secs is None:
async def get_sec():
return await cache.sys.lpop("jobs.bars_validation.scope")
else:
async def get_sec():
return secs.pop() if len(secs) else None
errors = 0
while code := await get_sec():
try:
for day in tf.day_frames[(tf.day_frames >= start) & (tf.day_frames <= end)]:
expected = await get_checksum(day)
if expected and expected.get(code):
actual = await calc_checksums(tf.int2date(day), [code])
d1 = actual.get(code)
d2 = expected.get(code)
missing1 = d2.keys() - d1 # local has no checksum
missing2 = d1.keys() - d2 # remote has no checksum
mismatch = {k for k in d1.keys() & d2 if d1[k] != d2[k]}
for k in missing1:
info = (
ValidationError.LOCAL_MISS,
day,
code,
k,
d1.get(k),
d2.get(k),
)
report.info("%s,%s,%s,%s,%s,%s", *info)
await emit.emit(Events.OMEGA_VALIDATION_ERROR, info)
for k in missing2:
info = (
ValidationError.REMOTE_MISS,
day,
code,
k,
d1.get(k),
d2.get(k),
)
report.info("%s,%s,%s,%s,%s,%s", *info)
await emit.emit(Events.OMEGA_VALIDATION_ERROR, info)
for k in mismatch:
info = (
ValidationError.MISMATCH,
day,
code,
k,
d1.get(k),
d2.get(k),
)
report.info("%s,%s,%s,%s,%s,%s", *info)
await emit.emit(Events.OMEGA_VALIDATION_ERROR, info)
else:
logger.error("checksum for %s not found.", day)
info = (ValidationError.NO_CHECKSUM, day, None, None, None, None)
report.info("%s,%s,%s,%s,%s,%s", *info)
await emit.emit(Events.OMEGA_VALIDATION_ERROR, info)
except Exception as e:
logger.exception(e)
errors += 1
await emit.emit(Events.OMEGA_VALIDATION_ERROR, (ValidationError.UNKNOWN, errors))
logger.warning("do_validation meet %s unknown errors", errors)
on_validation_error(report)
async
¶
Parameters:
Name | Type | Description | Default |
---|---|---|---|
report |
tuple |
object like ::(reason, day, code, frame, local, remote) |
required |
Source code in omega/core/sanity.py
async def on_validation_error(report: tuple):
"""
Args:
report: object like ::(reason, day, code, frame, local, remote)
Returns:
"""
global validation_errors, no_validation_error_days
# todo: raise no checksum issue
if report[0] == ValidationError.UNKNOWN:
no_validation_error_days = set()
else:
validation_errors.append(report)
if report[1] is not None:
no_validation_error_days -= {report[1]}
start_validation()
async
¶
将待校验的证券按CPU个数均匀划分,创建与CPU个数相同的子进程来执行校验。校验的起始时间由数据 库中jobs.bars_validation.range.start和jobs.bars_validation.range.stop来决定,每次校验 结束后,将jobs.bars_validation.range.start更新为校验截止的最后交易日。如果各个子进程报告 的截止交易日不一样(比如发生了异常),则使用最小的交易日。
Source code in omega/core/sanity.py
async def start_validation():
"""
将待校验的证券按CPU个数均匀划分,创建与CPU个数相同的子进程来执行校验。校验的起始时间由数据
库中jobs.bars_validation.range.start和jobs.bars_validation.range.stop来决定,每次校验
结束后,将jobs.bars_validation.range.start更新为校验截止的最后交易日。如果各个子进程报告
的截止交易日不一样(比如发生了异常),则使用最小的交易日。
"""
global validation_errors, no_validation_error_days
validation_errors = []
secs = Securities()
cpu_count = psutil.cpu_count()
# to check if the range is right
pl = cache.sys.pipeline()
pl.get("jobs.bars_validation.range.start")
pl.get("jobs.bars_validation.range.end")
start, end = await pl.execute()
if start is None:
if cfg.omega.validation.start is None:
logger.warning("start of validation is not specified, validation aborted.")
return
else:
start = tf.date2int(arrow.get(cfg.omega.validation.start))
else:
start = int(start)
if end is None:
end = tf.date2int(tf.floor(arrow.now().date(), FrameType.DAY))
else:
end = int(end)
assert start <= end
no_validation_error_days = set(
tf.day_frames[(tf.day_frames >= start) & (tf.day_frames <= end)]
)
# fixme: do validation per frame_type
# fixme: test fail. Rewrite this before 0.6 releases
codes = secs.choose(cfg.omega.sync)
await cache.sys.delete("jobs.bars_validation.scope")
await cache.sys.lpush("jobs.bars_validation.scope", *codes)
logger.info("start validation %s secs from %s to %s.", len(codes), start, end)
emit.register(Events.OMEGA_VALIDATION_ERROR, on_validation_error)
t0 = time.time()
code = (
"from omega.core.sanity import do_validation_process_entry; "
"do_validation_process_entry()"
)
procs = []
for i in range(cpu_count):
proc = subprocess.Popen([sys.executable, "-c", code], env=os.environ)
procs.append(proc)
timeout = 3600
while timeout > 0:
await asyncio.sleep(2)
timeout -= 2
for proc in procs:
proc.poll()
if all([proc.returncode is not None for proc in procs]):
break
if timeout <= 0:
for proc in procs:
try:
os.kill(proc.pid, signal.SIGTERM)
except Exception:
pass
# set next start point
validation_days = set(
tf.day_frames[(tf.day_frames >= start) & (tf.day_frames <= end)]
)
diff = validation_days - no_validation_error_days
if len(diff):
last_no_error_day = min(diff)
else:
last_no_error_day = end
await cache.sys.set("jobs.bars_validation.range.start", last_no_error_day)
elapsed = time.time() - t0
logger.info(
"Validation cost %s seconds, validation will start at %s next time",
elapsed,
last_no_error_day,
)
fetcher
special
¶
quotes fetcher
abstract_quotes_fetcher
¶
This is a awesome python script!
AbstractQuotesFetcher
¶
get_all_trade_days()
async
classmethod
¶
返回交易日历。不同的服务器可能返回的时间跨度不一样,但相同跨度内的时间应该一样。对已 经过去的交易日,可以用上证指数来验证。
Source code in omega/fetcher/abstract_quotes_fetcher.py
@classmethod
async def get_all_trade_days(cls):
days = await cls.get_instance().get_all_trade_days()
await cache.save_calendar("day_frames", map(tf.date2int, days))
return days
get_bars(sec, end, n_bars, frame_type, include_unclosed=True)
async
classmethod
¶
获取行情数据,并将已结束的周期数据存入缓存。
各种情况:
1. 假设现在时间是2021-2-24日,盘中。此时请求上证指数日线,且include_unclosed
为
True
:
get_bars("000001.XSHE", None, 1, FrameType.DAY)
[(datetime.date(2021, 2, 24), 3638.9358, 3645.5288, 3617.44, 3620.3542, ...)]
-
假设现在时间是2021-2-23日,盘后,此时请求上证指数日线,将得到收盘后固定的价格。
-
上述请求中,
include_unclosed
参数使用默认值(True
)。如果取为False
,仍以示例1 指定的场景为例,则:get_bars("000001.XSHG", None, 1, FrameType.DAY, False)
-
同样假设现在时间是2021-2-24日盘中,周三。此时获取周K线。在
include_unclosed
分别为True
和False
的情况下:[(datetime.date(2021, 2, 24), 3707.19, 3717.27, 3591.3647, 3592.3977, ...)] [(datetime.date(2021, 2, 19), 3721.09, 3731.69, 3634.01, 3696.17, ...)]
include_unclosed
为True时,返回的周K线是以2021-2-24为Frame的。同样,在盘中 的不同时间取这个数据,除了open
数值之外,其它都是实时变化的。 -
如果在已结束的周期中,包含停牌数据,则会对停牌期间的数据进行nan填充,以方便数据使用 者可以较容易地分辨出数据不连贯的原因:哪些是停牌造成的,哪些是非交易日造成的。这种处理 会略微降低数据获取速度,并增加存储空间。
比如下面的请求:
get_bars("000029.XSHE", datetime.date(2020,8,18), 10, FrameType.DAY)
注意如果取周线和月线数据,如果当天停牌,但只要周线有数据,则仍能取到。周线(或者月线)的
frame
将是停牌前一交易日。比如,
sec = "600721.XSHG"
frame_type = FrameType.WEEK
end = arrow.get("2020-4-29 15:00").datetime
bars = await aq.get_bars(sec, end, 3, FrameType.WEEK)
print(bars)
[(datetime.date(2020, 4, 17), 6.02, 6.69, 5.84, 6.58, ...)
(datetime.date(2020, 4, 24), 6.51, 6.57, 5.68, 5.72, ...)
(datetime.date(2020, 4, 28), 5.7, 5.71, 5.17, 5.36, ...)]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sec |
str |
证券代码 |
required |
end |
Union[datetime.date, datetime.datetime] |
数据截止日 |
required |
n_bars |
int |
待获取的数据条数 |
required |
frame_type |
FrameType |
数据所属的周期 |
required |
include_unclosed |
|
如果为真,则会包含当end所处的那个Frame的数据,即使当前它还未结束 |
True |
Source code in omega/fetcher/abstract_quotes_fetcher.py
@classmethod
async def get_bars(
cls,
sec: str,
end: Frame,
n_bars: int,
frame_type: FrameType,
include_unclosed=True,
) -> np.ndarray:
"""获取行情数据,并将已结束的周期数据存入缓存。
各种情况:
1. 假设现在时间是2021-2-24日,盘中。此时请求上证指数日线,且`include_unclosed`为
`True`:
```python
get_bars("000001.XSHE", None, 1, FrameType.DAY)
```
得到的数据可能如下:
```
[(datetime.date(2021, 2, 24), 3638.9358, 3645.5288, 3617.44, 3620.3542, ...)]
```
在收盘前不同时间调用,得到的数据除开盘价外,其它都实时在变动。
2. 假设现在时间是2021-2-23日,盘后,此时请求上证指数日线,将得到收盘后固定的价格。
3. 上述请求中,`include_unclosed`参数使用默认值(`True`)。如果取为`False`,仍以示例1
指定的场景为例,则:
```python
get_bars("000001.XSHG", None, 1, FrameType.DAY, False)
```
因为2021-2-24日未收盘,所以获取的最后一条数据是2021-2-23日的。
4. 同样假设现在时间是2021-2-24日盘中,周三。此时获取周K线。在`include_unclosed`分别为
`True`和`False`的情况下:
```
[(datetime.date(2021, 2, 24), 3707.19, 3717.27, 3591.3647, 3592.3977, ...)]
[(datetime.date(2021, 2, 19), 3721.09, 3731.69, 3634.01, 3696.17, ...)]
```
注意这里当`include_unclosed`为True时,返回的周K线是以2021-2-24为Frame的。同样,在盘中
的不同时间取这个数据,除了`open`数值之外,其它都是实时变化的。
5. 如果在已结束的周期中,包含停牌数据,则会对停牌期间的数据进行nan填充,以方便数据使用
者可以较容易地分辨出数据不连贯的原因:哪些是停牌造成的,哪些是非交易日造成的。这种处理
会略微降低数据获取速度,并增加存储空间。
比如下面的请求:
```python
get_bars("000029.XSHE", datetime.date(2020,8,18), 10, FrameType.DAY)
```
将获取到2020-8-5到2020-8-18间共10条数据。但由于期间000029这支股票处于停牌期,所以返回
的10条数据中,数值部分全部填充为np.nan。
注意如果取周线和月线数据,如果当天停牌,但只要周线有数据,则仍能取到。周线(或者月线)的
`frame`将是停牌前一交易日。比如,
```python
sec = "600721.XSHG"
frame_type = FrameType.WEEK
end = arrow.get("2020-4-29 15:00").datetime
bars = await aq.get_bars(sec, end, 3, FrameType.WEEK)
print(bars)
```
2020年4月30日是该周的最后一个交易日。股票600721在4月29日停牌一天。上述请求将得到如下数
据:
```
[(datetime.date(2020, 4, 17), 6.02, 6.69, 5.84, 6.58, ...)
(datetime.date(2020, 4, 24), 6.51, 6.57, 5.68, 5.72, ...)
(datetime.date(2020, 4, 28), 5.7, 5.71, 5.17, 5.36, ...)]
```
停牌发生在日线级别上,但我们的请求发生在周线级别上,所以不会对4/29日进行填充,而是返回
截止到4月29日的数据。
args:
sec: 证券代码
end: 数据截止日
n_bars: 待获取的数据条数
frame_type: 数据所属的周期
include_unclosed: 如果为真,则会包含当end所处的那个Frame的数据,即使当前它还未结束
"""
now = arrow.now(tz=cfg.tz)
end = end or now.datetime
# 如果end超出当前时间,则认为是不合法的。如果用户想取到最新的数据,应该传入None
if type(end) == datetime.date:
if end > now.date():
return None
elif type(end) == datetime.datetime:
if end > now:
return None
bars = await cls.get_instance().get_bars(
sec, end, n_bars, frame_type.value, include_unclosed
)
if len(bars) == 0:
return
# 根据指定的end,计算结束时的frame
last_closed_frame = tf.floor(end, frame_type)
last_frame = bars[-1]["frame"]
# 计算有多少根k线是已结束的
n_closed = n_bars - 1
if frame_type == FrameType.DAY:
# 盘后取日线,返回的一定是全部都已closed的数据
# 盘中取日线,返回的last_frame会是当天的日期,但该日线并未结束
if now.datetime.hour >= 15 or last_frame < now.date():
n_closed = n_bars
else:
# 如果last_frame <= end的上限,则返回的也一定是全部都closed的数据
if last_frame <= tf.floor(end, frame_type):
n_closed = n_bars
remainder = [bars[-1]] if n_closed < n_bars else None
closed_bars = cls._fill_na(bars, n_closed, last_closed_frame, frame_type)
# 只保存已结束的bar
await cache.save_bars(sec, closed_bars, frame_type)
if remainder is None:
return closed_bars
else:
return np.concatenate([closed_bars, remainder])
get_security_list()
async
classmethod
¶
按如下格式返回证券列表。
code display_name name start_date end_date type 000001.XSHE 平安银行 PAYH 1991-04-03 2200-01-01 stock
Returns:
Type | Description |
---|---|
Union[NoneType, numpy.ndarray] |
Union[None, np.ndarray]: [description] |
Source code in omega/fetcher/abstract_quotes_fetcher.py
@classmethod
async def get_security_list(cls) -> Union[None, np.ndarray]:
"""按如下格式返回证券列表。
code display_name name start_date end_date type
000001.XSHE 平安银行 PAYH 1991-04-03 2200-01-01 stock
Returns:
Union[None, np.ndarray]: [description]
"""
securities = await cls.get_instance().get_security_list()
if securities is None or len(securities) == 0:
logger.warning("failed to update securities. %s is returned.", securities)
return securities
key = "securities"
pipeline = cache.security.pipeline()
pipeline.delete(key)
for code, display_name, name, start, end, _type in securities:
pipeline.rpush(
key, f"{code},{display_name},{name},{start}," f"{end},{_type}"
)
await pipeline.execute()
return securities
get_valuation(code, day, fields=None, n=1)
async
classmethod
¶
读取code指定的股票在date指定日期的市值数据。
返回数据包括: code: 股票代码 day: 日期 captialization: 总股本 circulating_cap: 流通股本(万股) market_cap: 总市值(亿元) circulating_market_cap: 流通市值(亿元) turnover_ration: 换手率(%) pe_ratio: 市盈率(PE,TTM)每股市价为每股收益的倍数,反映投资人对每元净利润所愿支付的价 格,用来估计股票的投资报酬和风险 pe_ratio_lyr: 市盈率(PE),以上一年度每股盈利计算的静态市盈率. 股价/最近年度报告EPS pb_ratio: 市净率(PB) ps_ratio: 市销率(PS) pcf_ratio: 市现率(PCF)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
code |
Union[str, List[str]] |
[description] |
required |
day |
date |
[description] |
required |
Returns:
Type | Description |
---|---|
ndarray |
numpy.ndarray: [description] |
Source code in omega/fetcher/abstract_quotes_fetcher.py
@classmethod
async def get_valuation(
cls,
code: Union[str, List[str]],
day: datetime.date,
fields: List[str] = None,
n: int = 1,
) -> np.ndarray:
valuation = await cls.get_instance().get_valuation(code, day, n)
await Valuation.save(valuation)
if fields is None:
return valuation
if isinstance(fields, str):
fields = [fields]
mapping = dict(valuation.dtype.descr)
fields = [(name, mapping[name]) for name in fields]
return rfn.require_fields(valuation, fields)
archive
¶
adjust_range(batch=500)
async
¶
adjust secs's range after archive bars imported
Source code in omega/fetcher/archive.py
async def adjust_range(batch: int = 500):
"""adjust secs's range after archive bars imported"""
cur = b"0"
key = "archive.ranges.*"
logger.info("start adjust range")
while cur:
cur, keys = await cache.sys.scan(cur, match=key, count=batch)
if not keys:
continue
pl = cache.security.pipeline()
for item in keys:
try:
values = [int(v) for v in await cache.sys.lrange(item, 0, -1)]
values.sort()
arc_head, arc_tail = values[0], values[-1]
code_frame_key = item.replace("archive.ranges.", "")
head, tail = await cache.security.hmget(code_frame_key, "head", "tail")
head = int(head) if head is not None else None
tail = int(tail) if tail is not None else None
# head, tail, arc_head, arc_tail should be all frame-aligned
if head is None or tail is None:
head, tail = arc_head, arc_tail
elif arc_tail < head or arc_head > tail:
head, tail = arc_head, arc_tail
else:
head = min(arc_head, head)
tail = max(arc_tail, tail)
pl.hset(code_frame_key, "head", head)
pl.hset(code_frame_key, "tail", tail)
except Exception as e:
logger.exception(e)
logger.warning("failed to set range for %s", code_frame_key)
await pl.execute()
clear_range()
async
¶
clear cached secs's range before/after import archive bars
Source code in omega/fetcher/archive.py
async def clear_range():
"""clear cached secs's range before/after import archive bars"""
key = "archive.ranges.*"
keys = await cache.sys.keys(key)
if keys:
await cache.sys.delete(*keys)
main(months, cats, archive_server=None)
¶
允许将本模块以独立进程运行,以支持多进程
Parameters:
Name | Type | Description | Default |
---|---|---|---|
months |
str |
逗号分隔的月列表。格式如202012 |
required |
cats |
str |
逗号分隔的类别列表,如"stock,index" |
required |
Source code in omega/fetcher/archive.py
def main(months: str, cats: str, archive_server: str = None):
"""允许将本模块以独立进程运行,以支持多进程
Args:
months (str): 逗号分隔的月列表。格式如202012
cats (str): 逗号分隔的类别列表,如"stock,index"
"""
config_dir = get_config_dir()
cfg = cfg4py.init(config_dir, False)
if archive_server:
cfg.omega.urls.archive = archive_server
months = str(months)
months = [int(x) for x in months.split(",") if x]
cats = [x for x in cats.split(",")]
asyncio.run(_main(months, cats))
quotes_fetcher
¶
Interface for quotes fetcher
QuotesFetcher
¶
get_all_trade_days(self)
async
¶
返回交易日历。不同的服务器可能返回的时间跨度不一样,但相同跨度内的时间应该一样。对已 经过去的交易日,可以用上证指数来验证。
Source code in omega/fetcher/quotes_fetcher.py
async def get_all_trade_days(self):
"""
返回交易日历。不同的服务器可能返回的时间跨度不一样,但相同跨度内的时间应该一样。对已
经过去的交易日,可以用上证指数来验证。
"""
raise NotImplementedError
get_bars(self, sec, end, n_bars, frame_type, allow_unclosed=True)
async
¶
取n个单位的k线数据。
k线周期由frame_type指定。最后结束周期为end。股票停牌期间的数据会使用None填充。
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sec |
str |
证券代码 |
required |
end |
Union[datetime.date, datetime.datetime] |
required | |
n_bars |
int |
required | |
frame_type |
FrameType |
required | |
allow_unclosed |
bool |
为真时,当前未结束的帧数据也获取 |
True |
Returns:
Type | Description |
---|---|
ndarray |
a numpy.ndarray, with each element is: 'frame': datetime.date or datetime.datetime, depends on frame_type. Denotes which time frame the data belongs . 'open, high, low, close': float 'volume': double 'amount': the buy/sell amount in total, double 'factor': float, may exist or not |
Source code in omega/fetcher/quotes_fetcher.py
async def get_bars(
self,
sec: str,
end: Frame,
n_bars: int,
frame_type: FrameType,
allow_unclosed=True,
) -> numpy.ndarray:
"""取n个单位的k线数据。
k线周期由frame_type指定。最后结束周期为end。股票停牌期间的数据会使用None填充。
Args:
sec (str): 证券代码
end (Frame):
n_bars (int):
frame_type (FrameType):
allow_unclosed (bool): 为真时,当前未结束的帧数据也获取
Returns:
a numpy.ndarray, with each element is:
'frame': datetime.date or datetime.datetime, depends on frame_type.
Denotes which time frame the data
belongs .
'open, high, low, close': float
'volume': double
'amount': the buy/sell amount in total, double
'factor': float, may exist or not
"""
raise NotImplementedError
get_security_list(self)
async
¶
fetch security list from server. The returned list is a numpy.ndarray, which each elements should look like: code display_name name start_date end_date type 000001.XSHE 平安银行 PAYH 1991-04-03 2200-01-01 stock 000002.XSHE 万科A WKA 1991-01-29 2200-01-01 stock
all fields are string type
Returns:
Type | Description |
---|---|
ndarray |
Source code in omega/fetcher/quotes_fetcher.py
async def get_security_list(self) -> numpy.ndarray:
"""
fetch security list from server. The returned list is a numpy.ndarray,
which each elements
should look like:
code display_name name start_date end_date type
000001.XSHE 平安银行 PAYH 1991-04-03 2200-01-01 stock
000002.XSHE 万科A WKA 1991-01-29 2200-01-01 stock
all fields are string type
Returns:
"""
raise NotImplementedError
get_valuation(self, code, day)
async
¶
读取code指定的股票在date指定日期的市值数据。
返回数据包括: code: 股票代码 day: 日期 captialization: 总股本 circulating_cap: 流通股本(万股) market_cap: 总市值(亿元) circulating_market_cap: 流通市值(亿元) turnover_ration: 换手率(%) pe_ratio: 市盈率(PE,TTM)每股市价为每股收益的倍数,反映投资人对每元净利润所愿支付的价 格,用来估计股票的投资报酬和风险 pe_ratio_lyr: 市盈率(PE),以上一年度每股盈利计算的静态市盈率. 股价/最近年度报告EPS pb_ratio: 市净率(PB) ps_ratio: 市销率(PS) pcf_ratio: 市现率(PCF)
Parameters:
Name | Type | Description | Default |
---|---|---|---|
code |
Union[str, List[str]] |
[description] |
required |
day |
Union[datetime.date, datetime.datetime] |
[description] |
required |
Returns:
Type | Description |
---|---|
ndarray |
numpy.ndarray: [description] |
Source code in omega/fetcher/quotes_fetcher.py
async def get_valuation(
self, code: Union[str, List[str]], day: Frame
) -> numpy.ndarray:
"""读取code指定的股票在date指定日期的市值数据。
返回数据包括:
code: 股票代码
day: 日期
captialization: 总股本
circulating_cap: 流通股本(万股)
market_cap: 总市值(亿元)
circulating_market_cap: 流通市值(亿元)
turnover_ration: 换手率(%)
pe_ratio: 市盈率(PE,TTM)每股市价为每股收益的倍数,反映投资人对每元净利润所愿支付的价
格,用来估计股票的投资报酬和风险
pe_ratio_lyr: 市盈率(PE),以上一年度每股盈利计算的静态市盈率. 股价/最近年度报告EPS
pb_ratio: 市净率(PB)
ps_ratio: 市销率(PS)
pcf_ratio: 市现率(PCF)
Args:
code (Union[str, List[str]]): [description]
day (Frame): [description]
Returns:
numpy.ndarray: [description]
"""
raise NotImplementedError
jobs
special
¶
syncjobs
¶
load_sync_params(frame_type)
¶
根据指定的frame_type,从配置文件中加载同步参数
Parameters:
Name | Type | Description | Default |
---|---|---|---|
frame_type |
FrameType |
[description] |
required |
Returns:
Type | Description |
---|---|
dict |
dict: see @[omega.jobs.syncjobs.parse_sync_params] |
Source code in omega/jobs/syncjobs.py
def load_sync_params(frame_type: FrameType) -> dict:
"""根据指定的frame_type,从配置文件中加载同步参数
Args:
frame_type (FrameType): [description]
Returns:
dict: see @[omega.jobs.syncjobs.parse_sync_params]
"""
for item in cfg.omega.sync.bars:
if item.get("frame") == frame_type.value:
try:
secs, frame_type, start, stop, delay = parse_sync_params(**item)
return item
except Exception as e:
logger.exception(e)
logger.warning("failed to parse %s", item)
return None
return None
parse_sync_params(frame, cat=None, start=None, stop=None, delay=0, include='', exclude='')
¶
按照使用手册中的规则,解析和补全同步参数。
如果frame_type
为分钟级,则当start
指定为date
类型时,自动更正为对应交易日的起始帧;
当stop
为date
类型时,自动更正为对应交易日的最后一帧。
Parameters:
Name | Type | Description | Default |
---|---|---|---|
frame |
Union[str, datetime.date, datetime.datetime] |
frame type to be sync. The word |
required |
cat |
List[str] |
which catetories is about to be synced. Should be one of ['stock', 'index']. Defaults to None. |
None |
start |
Union[str, datetime.date] |
[description]. Defaults to None. |
None |
stop |
Union[str, datetime.date, datetime.datetime] |
[description]. Defaults to None. |
None |
delay |
int |
[description]. Defaults to 5. |
0 |
include |
str |
which securities should be included, seperated by space, for example, "000001.XSHE 000004.XSHE". Defaults to empty string. |
'' |
exclude |
str |
which securities should be excluded, seperated by a space. Defaults to empty string. |
'' |
Returns:
Type | Description |
---|---|
Tuple |
|
Source code in omega/jobs/syncjobs.py
def parse_sync_params(
frame: Union[str, Frame],
cat: List[str] = None,
start: Union[str, datetime.date] = None,
stop: Union[str, Frame] = None,
delay: int = 0,
include: str = "",
exclude: str = "",
) -> Tuple:
"""按照[使用手册](usage.md#22-如何同步K线数据)中的规则,解析和补全同步参数。
如果`frame_type`为分钟级,则当`start`指定为`date`类型时,自动更正为对应交易日的起始帧;
当`stop`为`date`类型时,自动更正为对应交易日的最后一帧。
Args:
frame (Union[str, Frame]): frame type to be sync. The word ``frame`` is used
here for easy understand by end user. It actually implies "FrameType".
cat (List[str]): which catetories is about to be synced. Should be one of
['stock', 'index']. Defaults to None.
start (Union[str, datetime.date], optional): [description]. Defaults to None.
stop (Union[str, Frame], optional): [description]. Defaults to None.
delay (int, optional): [description]. Defaults to 5.
include (str, optional): which securities should be included, seperated by
space, for example, "000001.XSHE 000004.XSHE". Defaults to empty string.
exclude (str, optional): which securities should be excluded, seperated by
a space. Defaults to empty string.
Returns:
- codes (List[str]): 待同步证券列表
- frame_type (FrameType):
- start (Frame):
- stop (Frame):
- delay (int):
"""
frame_type = FrameType(frame)
if frame_type in tf.minute_level_frames:
if stop:
stop = arrow.get(stop, tzinfo=cfg.tz)
if stop.hour == 0: # 未指定有效的时间帧,使用当日结束帧
stop = tf.last_min_frame(tf.day_shift(stop.date(), 0), frame_type)
else:
stop = tf.floor(stop, frame_type)
else:
stop = tf.floor(arrow.now(tz=cfg.tz).datetime, frame_type)
if stop > arrow.now(tz=cfg.tz):
raise ValueError(f"请勿将同步截止时间设置在未来: {stop}")
if start:
start = arrow.get(start, tzinfo=cfg.tz)
if start.hour == 0: # 未指定有效的交易帧,使用当日的起始帧
start = tf.first_min_frame(tf.day_shift(start.date(), 0), frame_type)
else:
start = tf.floor(start, frame_type)
else:
start = tf.shift(stop, -999, frame_type)
else:
stop = (stop and arrow.get(stop).date()) or arrow.now().date()
if stop == arrow.now().date():
stop = arrow.now(tz=cfg.tz)
stop = tf.floor(stop, frame_type)
start = tf.floor((start and arrow.get(start).date()), frame_type) or tf.shift(
stop, -1000, frame_type
)
secs = Securities()
codes = secs.choose(cat or [])
exclude = map(lambda x: x, exclude.split(" "))
codes = list(set(codes) - set(exclude))
include = list(filter(lambda x: x, include.split(" ")))
codes.extend(include)
return codes, frame_type, start, stop, int(delay)
sync_bars(params)
async
¶
sync bars on signal OMEGA_DO_SYNC received
Parameters:
Name | Type | Description | Default |
---|---|---|---|
params |
dict |
composed of the following:
|
required |
Returns:
Type | Description |
---|---|
[type] |
[description] |
Source code in omega/jobs/syncjobs.py
async def sync_bars(params: dict):
"""sync bars on signal OMEGA_DO_SYNC received
Args:
params (dict): composed of the following:
```
{
secs (List[str]): 待同步的证券标的.如果为None或者为空,则从数据库中轮询
frame_type (FrameType):k线的帧类型
start (Frame): k线起始时间
stop (Frame): k线结束时间
}
```
Returns:
[type]: [description]
"""
secs, frame_type, start, stop = (
params.get("secs"),
params.get("frame_type"),
params.get("start"),
params.get("stop"),
)
if secs is not None:
logger.info(
"sync bars with %s(%s ~ %s) for given %s secs",
frame_type,
start,
stop,
len(secs),
)
async def get_sec():
return secs.pop() if len(secs) else None
else:
logger.info(
"sync bars with %s(%s ~ %s) in polling mode", frame_type, start, stop
)
async def get_sec():
return await cache.sys.lpop(key_scope)
key_scope = f"jobs.bars_sync.scope.{frame_type.value}"
if start is None or frame_type is None:
raise ValueError("you must specify a start date/frame_type for sync")
if stop is None:
stop = tf.floor(arrow.now(tz=cfg.tz), frame_type)
while code := await get_sec():
try:
await sync_bars_for_security(code, frame_type, start, stop)
except FetcherQuotaError as e:
logger.warning("Quota exceeded when syncing %s. Sync aborted.", code)
logger.exception(e)
return # stop the sync
except Exception as e:
logger.warning("Failed to sync %s", code)
logger.exception(e)
elapsed = await _stop_job_timer("sync")
logger.info("%s finished quotes sync in %s seconds", os.getpid(), elapsed)
sync_calendar()
async
¶
从上游服务器获取所有交易日,并计算出周线帧和月线帧
Source code in omega/jobs/syncjobs.py
async def sync_calendar():
"""从上游服务器获取所有交易日,并计算出周线帧和月线帧
Returns:
"""
trade_days = await aq.get_all_trade_days()
if trade_days is None or len(trade_days) == 0:
logger.warning("failed to fetch trade days.")
return None
tf.day_frames = [tf.date2int(x) for x in trade_days]
weeks = []
last = trade_days[0]
for cur in trade_days:
if cur.weekday() < last.weekday() or (cur - last).days >= 7:
weeks.append(last)
last = cur
if weeks[-1] < last:
weeks.append(last)
tf.week_frames = [tf.date2int(x) for x in weeks]
await cache.save_calendar("week_frames", map(tf.date2int, weeks))
months = []
last = trade_days[0]
for cur in trade_days:
if cur.day < last.day:
months.append(last)
last = cur
months.append(last)
tf.month_frames = [tf.date2int(x) for x in months]
await cache.save_calendar("month_frames", map(tf.date2int, months))
logger.info("trade_days is updated to %s", trade_days[-1])
sync_security_list()
async
¶
更新证券列表
注意证券列表在AbstractQuotesServer取得时就已保存,此处只是触发
Source code in omega/jobs/syncjobs.py
async def sync_security_list():
"""更新证券列表
注意证券列表在AbstractQuotesServer取得时就已保存,此处只是触发
"""
secs = await aq.get_security_list()
logger.info("%s secs are fetched and saved.", len(secs))
trigger_bars_sync(sync_params=None, force=False)
async
¶
初始化bars_sync的任务,发信号给各quotes_fetcher进程以启动同步。
Parameters:
Name | Type | Description | Default |
---|---|---|---|
frame_type |
FrameType |
要同步的帧类型 |
required |
sync_params |
dict |
同步参数
|
None |
force |
|
即使当前不是交易日,是否也强行进行同步。 |
False |
Source code in omega/jobs/syncjobs.py
async def trigger_bars_sync(sync_params: dict = None, force=False):
"""初始化bars_sync的任务,发信号给各quotes_fetcher进程以启动同步。
Args:
frame_type (FrameType): 要同步的帧类型
sync_params (dict): 同步参数
```
{
start: 起始帧
stop: 截止帧
frame: 帧类型
delay: 延迟启动时间,以秒为单位
cat: 证券分类,如stock, index等
delay: seconds for sync to wait.
}
```
see more @[omega.jobs.syncjobs.parse_sync_params][]
force: 即使当前不是交易日,是否也强行进行同步。
Returns:
"""
if not force and not tf.is_trade_day(arrow.now()):
return
codes, frame_type, start, stop, delay = parse_sync_params(**sync_params)
key_scope = f"jobs.bars_sync.scope.{frame_type.value}"
if len(codes) == 0:
logger.warning("no securities are specified for sync %s", frame_type)
return
fmt_str = "sync from %s to %s in frame_type(%s) for %s secs"
logger.info(fmt_str, start, stop, frame_type, len(codes))
# secs are stored into cache, so each fetcher can polling it
pl = cache.sys.pipeline()
pl.delete(key_scope)
pl.lpush(key_scope, *codes)
await pl.execute()
await asyncio.sleep(delay)
await _start_job_timer("sync")
await emit.emit(
Events.OMEGA_DO_SYNC, {"frame_type": frame_type, "start": start, "stop": stop}
)
fmt_str = "send trigger sync event to fetchers: from %s to %s in frame_type(%s) for %s secs"
logger.info(fmt_str, start, stop, frame_type, len(codes))
trigger_single_worker_sync(_type, params=None)
async
¶
启动只需要单个quotes fetcher进程来完成的数据同步任务
比如交易日历、证券列表等如果需要同时启动多个quotes fetcher进程来完成数据同步任务,应该通过 pyemit来发送广播消息。
Parameters:
Name | Type | Description | Default |
---|---|---|---|
_type |
str |
the type of data to be synced, either |
required |
Source code in omega/jobs/syncjobs.py
async def trigger_single_worker_sync(_type: str, params: dict = None):
"""启动只需要单个quotes fetcher进程来完成的数据同步任务
比如交易日历、证券列表等如果需要同时启动多个quotes fetcher进程来完成数据同步任务,应该通过
pyemit来发送广播消息。
Args:
_type: the type of data to be synced, either ``calendar`` or ``ecurity_list``
"""
url = cfg.omega.urls.quotes_server
if _type == "calendar":
url += "/jobs/sync_calendar"
elif _type == "security_list":
url += "/jobs/sync_security_list"
else:
raise ValueError(f"{_type} is not supported sync type.")
async with aiohttp.ClientSession() as client:
try:
async with client.post(url, data=params) as resp:
if resp.status != 200:
logger.warning("failed to trigger %s sync", _type)
else:
return await resp.json()
except Exception as e:
logger.exception(e)