Skip to content

API文档

app

Author: Aaron-Yang [code@jieyu.ai] Contributors:

start(impl, cfg=None, **fetcher_params)

启动一组Omega进程

使用本函数来启动一组Omega进程。这一组进程使用同样的quotes fetcher,但可能有1到多个session (限制由quotes fetcher给出)。它们共享同一个port。Sanic在内部实现了load-balance机制。

通过多次调用本方法,传入不同的quotes fetcher impl参数,即可启动多组Omega服务。

如果指定了fetcher_params,则start将使用impl, fetcher_params来启动单组Omega服务,使 用impl指定的fetcher。否则,将使用cfg.quotes_fetcher中提供的信息来创建Omega.

如果cfg不为None,则应该指定为合法的json string,其内容将覆盖本地cfg。这个设置目前的主要 要作用是方便单元测试。

Parameters:

Name Type Description Default
impl str

quotes fetcher implementor

required
cfg dict

the cfg in json string

None
fetcher_params

contains info required by creating quotes fetcher

{}
Source code in omega/app.py
def start(impl: str, cfg: dict = None, **fetcher_params):
    """启动一组Omega进程

    使用本函数来启动一组Omega进程。这一组进程使用同样的quotes fetcher,但可能有1到多个session
    (限制由quotes fetcher给出)。它们共享同一个port。Sanic在内部实现了load-balance机制。

    通过多次调用本方法,传入不同的quotes fetcher impl参数,即可启动多组Omega服务。

    如果指定了`fetcher_params`,则`start`将使用impl, fetcher_params来启动单组Omega服务,使
    用impl指定的fetcher。否则,将使用`cfg.quotes_fetcher`中提供的信息来创建Omega.

    如果`cfg`不为None,则应该指定为合法的json string,其内容将覆盖本地cfg。这个设置目前的主要
    要作用是方便单元测试。


    Args:
        impl (str): quotes fetcher implementor
        cfg: the cfg in json string
        fetcher_params: contains info required by creating quotes fetcher
    """
    sessions = fetcher_params.get("sessions", 1)
    port = fetcher_params.get("port", 3181)
    omega = Omega(impl, cfg, **fetcher_params)

    app.register_listener(omega.init, "before_server_start")

    logger.info("starting sanic group listen on %s with %s workers", port, sessions)
    app.run(
        host="0.0.0.0",
        port=port,
        workers=sessions,
        register_sys_signals=True,
        protocol=WebSocketProtocol,
    )

cli

管理应用程序生命期、全局对象、任务、全局消息响应

bin_cut(arr, n)

将数组arr切分成n份

Parameters:

Name Type Description Default
arr list

[description]

required
n int

[description]

required

Returns:

Type Description
[type]

[description]

Source code in omega/cli.py
def bin_cut(arr: list, n: int):
    """将数组arr切分成n份

    Args:
        arr ([type]): [description]
        n ([type]): [description]

    Returns:
        [type]: [description]
    """
    result = [[] for i in range(n)]

    for i, e in enumerate(arr):
        result[i % n].append(e)

    return [e for e in result if len(e)]

config_fetcher(settings)

配置jq_fetcher

为Omega安装jqdatasdk, zillionare-omega-adaptors-jq, 配置jqdata访问账号

Source code in omega/cli.py
def config_fetcher(settings):
    """配置jq_fetcher

    为Omega安装jqdatasdk, zillionare-omega-adaptors-jq, 配置jqdata访问账号

    """
    msg = """
        Omega需要配置上游行情服务器。当前支持的上游服务器有:\\n
        [1] 聚宽`<joinquant>`\\n
    """
    print(format_msg(msg))
    more_account = True
    workers = []
    port = 3181
    while more_account:
        account = get_input("请输入账号:", None, os.environ.get("JQ_ACCOUNT") or "")
        password = get_input("请输入密码:", None, os.environ.get("JQ_PASSWORD") or "")
        sessions = get_input("请输入并发会话数", None, 1, "默认值[1]")
        workers.append(
            {
                "account": account,
                "password": password,
                "sessions": sessions,
                "port": port,
            }
        )
        port += 1
        more_account = input("继续配置新的账号[y|N]?\n").upper() == "Y"

    settings["quotes_fetchers"] = []
    append_fetcher(settings, {"impl": "jqadaptor", "workers": workers})

config_postgres(settings) async

配置数据连接并进行测试

Source code in omega/cli.py
async def config_postgres(settings):
    """配置数据连接并进行测试"""
    msg = """
        配置数据库并非必须。如果您仅限于在某些场景下使用Zillionare-omega,也可以不配置
        数据库更多信息,\\n请参阅https://readthedocs.org/projects/zillionare-omega/

        \\n跳过此项[S], 任意键继续:
    """
    choice = input(format_msg(msg))
    if choice.upper() == "S":
        return

    action = "R"
    while action == "R":
        host = get_input(
            "请输入服务器地址,", None, os.environ.get("POSTGRES_HOST") or "localhost"
        )
        port = get_input(
            "请输入服务器端口,", is_valid_port, os.environ.get("POSTGRES_PORT") or 5432
        )
        account = get_input("请输入账号,", None, os.environ.get("POSTGRES_USER"))
        password = get_input("请输入密码,", None, os.environ.get("POSTGRES_PASSWORD"))
        dbname = get_input(
            "请输入数据库名,", None, os.environ.get("POSTGRES_DB") or "zillionare"
        )

        print("正在测试Postgres连接...")
        dsn = f"postgres://{account}:{password}@{host}:{port}/{dbname}"
        if await check_postgres(dsn):
            update_config(settings, "postgres.dsn", dsn)
            update_config(settings, "postgres.enabled", True)
            print(f"[{colored('PASS', 'green')}] 数据库连接成功,并成功初始化!")
            return True
        else:
            hint = f"[{colored('FAIL', 'red')}] 忽略错误[C],重新输入[R],退出[Q]"
            action = choose_action(hint)

find_fetcher_processes()

查找所有的omega(fetcher)进程

Omega进程在ps -aux中显示应该包含 omega.app --impl=&ltfetcher&gt --port=&ltport&gt信息

Source code in omega/cli.py
def find_fetcher_processes():
    """查找所有的omega(fetcher)进程

    Omega进程在ps -aux中显示应该包含 omega.app --impl=&ltfetcher&gt --port=&ltport&gt信息
    """
    result = {}
    for p in psutil.process_iter():
        cmd = " ".join(p.cmdline())
        if "omega.app start" in cmd and "--impl" in cmd and "--port" in cmd:

            m = re.search(r"--impl=([^\s]+)", cmd)
            impl = m.group(1) if m else ""

            m = re.search(r"--port=(\d+)", cmd)
            port = m.group(1) if m else ""

            group = f"{impl}:{port}"
            pids = result.get(group, [])
            pids.append(p.pid)
            result[group] = pids

    return result

format_msg(msg)

格式化msg并显示在控制台上

本函数允许在写代码时按格式要求进行缩进和排版,但在输出时,这些格式都会被移除;对较长的文本, 按每80个字符为一行进行输出。

如果需要在msg中插入换行或者制表符,使用\n\t

Parameters:

Name Type Description Default
msg str required
Source code in omega/cli.py
def format_msg(msg: str):
    """格式化msg并显示在控制台上

    本函数允许在写代码时按格式要求进行缩进和排版,但在输出时,这些格式都会被移除;对较长的文本,
    按每80个字符为一行进行输出。

    如果需要在msg中插入换行或者制表符,使用`\\n`和`\\t`。
    args:
        msg:

    returns:
    """
    msg = re.sub(r"\n\s+", "", msg)
    msg = re.sub(r"[\t\n]", "", msg)

    msg = msg.replace("\\t", "\t").replace("\\n", "\n")
    lines = msg.split("\n")

    msg = []
    for line in lines:
        for i in range(int(len(line) / 80 + 1)):
            msg.append(line[i * 80 : min(len(line), (i + 1) * 80)])
    return "\n".join(msg)

setup(reset_factory=False, force=False) async

安装初始化入口

Parameters:

Name Type Description Default
reset_factory

reset to factory settings

False
force

if true, force setup no matter if run already

False
Source code in omega/cli.py
async def setup(reset_factory=False, force=False):
    """安装初始化入口

    Args:
        reset_factory: reset to factory settings
        force: if true, force setup no matter if run already

    Returns:

    """
    msg = """
    Zillionare-omega (大富翁)\\n
    -------------------------\\n
    感谢使用Zillionare-omega -- 高速分布式行情服务器!\\n
    """

    print(format_msg(msg))

    if not force:
        config_file = os.path.join(get_config_dir(), "defaults.yaml")
        if os.path.exists(config_file):
            print(f"{colored('[PASS]', 'green')} 安装程序已在本机上成功运行")
            sys.exit(0)

    if reset_factory:
        import sh

        dst = get_config_dir()
        os.makedirs(dst, exist_ok=True)

        src = os.path.join(factory_config_dir(), "defaults.yaml")
        dst = os.path.join(get_config_dir(), "defaults.yaml")
        sh.cp("-r", src, dst)

    print_title("Step 1. 检测安装环境...")
    settings = load_factory_settings()

    if not check_environment():
        sys.exit(-1)

    print_title("Step 2. 配置日志")
    config_logging(settings)
    print_title("Step 3. 配置上游服务器")
    config_fetcher(settings)
    print_title("Step 4. 配置Redis服务器")
    await config_redis(settings)
    print_title("Step 5. 配置Postgres服务器")
    await config_postgres(settings)
    save_config(settings)

    print_title("Step 6. 下载历史数据")
    config_dir = get_config_dir()
    cfg4py.init(config_dir, False)
    remove_console_log_handler()

    await start("fetcher")
    await download_archive(None)

    print_title("配置已完成。现在为您启动Omega,开启财富之旅!")

    await start("jobs")
    await status()

start(service='') async

启动omega主进程或者任务管理进程

Parameters:

Name Type Description Default
service str

if service is '', then starts fetcher processes.

''
Source code in omega/cli.py
async def start(service: str = ""):
    """启动omega主进程或者任务管理进程

    Args:
        service: if service is '', then starts fetcher processes.

    Returns:

    """
    print(f"正在启动zillionare-omega {colored(service, 'green')}...")

    config_dir = get_config_dir()
    cfg4py.init(config_dir, False)

    if service == "":
        await _start_jobs()
        await _start_fetcher_processes()
    elif service == "jobs":
        return await _start_jobs()
    elif service == "fetcher":
        return await _start_fetcher_processes()
    else:
        print("不支持的服务")

sync_bars(frame=None, codes=None) async

立即同步行情数据

如果frame, codes没有提供,则从配置文件中读取相关信息

Parameters:

Name Type Description Default
frame str None
codes str None
Source code in omega/cli.py
async def sync_bars(frame: str = None, codes: str = None):
    """立即同步行情数据

    如果`frame`, `codes`没有提供,则从配置文件中读取相关信息

    Args:
        frame:
        codes:

    Returns:

    """
    await _init()

    if frame:
        frame_type = FrameType(frame)
        params = syncjobs.load_sync_params(frame_type)
        if codes:
            params["cat"] = None
            params["include"] = codes
        await syncjobs.trigger_bars_sync(params, force=True)
        logger.info("request %s,%s send to workers.", params, codes)
    else:
        for frame_type in itertools.chain(tf.day_level_frames, tf.minute_level_frames):
            params = syncjobs.load_sync_params(frame_type)
            if not params:
                continue
            if codes:
                params["cat"] = None
                params["include"] = codes
            await syncjobs.trigger_bars_sync(params, force=True)

            logger.info("request %s,%s send to workers.", params, codes)

sync_calendar() async

发起同步交易日历请求

Source code in omega/cli.py
async def sync_calendar():
    """发起同步交易日历请求"""
    await _init()
    await syncjobs.trigger_single_worker_sync("calendar")

sync_sec_list() async

发起同步证券列表请求

Source code in omega/cli.py
async def sync_sec_list():
    """发起同步证券列表请求"""
    await _init()

    await syncjobs.trigger_single_worker_sync("security_list")

config special

Author: Aaron-Yang [code@jieyu.ai] Contributors:

core special

Author: Aaron-Yang [code@jieyu.ai] Contributors:

accelerate

Author: Aaron-Yang [code@jieyu.ai] Contributors:

things need speed

merge(left, right, by)

merge two numpy structured arrays by by key

njit fail if one of left, right contains object, not plain type, but the loop is very fast, cost 0.0001 seconds

Parameters:

Name Type Description Default
left [type]

[description]

required
right [type]

[description]

required
by [type]

[description]

required

Returns:

Type Description
[type]

[description]

Source code in omega/core/accelerate.py
def merge(left, right, by):
    """merge two numpy structured arrays by `by` key

    njit fail if one of left, right contains object, not plain type, but the loop is
    very fast, cost 0.0001 seconds

    Args:
        left ([type]): [description]
        right ([type]): [description]
        by ([type]): [description]

    Returns:
        [type]: [description]
    """
    i, j = 0, 0

    while j < len(right) and i < len(left):
        if right[j][by] < left[by][i]:
            j += 1
        elif right[j][by] == left[by][i]:
            left[i] = right[j]
            i += 1
            j += 1
        else:
            i += 1

    return left

sanity

Author: Aaron-Yang [code@jieyu.ai] Contributors:

calc_checksums(day, codes) async

Parameters:

Name Type Description Default
day date required
codes List required

Returns:

Type Description
dict

返回值为以code为键,该证券对应的{周期:checksum}的集合为值的集合

Source code in omega/core/sanity.py
async def calc_checksums(day: datetime.date, codes: List) -> dict:
    """
    Args:
        day:
        codes:

    Returns:
        返回值为以code为键,该证券对应的{周期:checksum}的集合为值的集合
    """
    end_time = arrow.get(day, tzinfo=cfg.tz).replace(hour=15)

    checksums = {}
    for i, code in enumerate(codes):
        try:
            checksum = {}
            d = await cache.get_bars_raw_data(code, day, 1, FrameType.DAY)
            if d:
                checksum[f"{FrameType.DAY.value}"] = xxhash.xxh32_hexdigest(d)
            d = await cache.get_bars_raw_data(code, end_time, 240, FrameType.MIN1)
            if d:
                checksum[f"{FrameType.MIN1.value}"] = xxhash.xxh32_hexdigest(d)

            d = await cache.get_bars_raw_data(code, end_time, 48, FrameType.MIN5)
            if d:
                checksum[f"{FrameType.MIN5.value}"] = xxhash.xxh32_hexdigest(d)

            d = await cache.get_bars_raw_data(code, end_time, 16, FrameType.MIN15)
            if d:
                checksum[f"{FrameType.MIN15.value}"] = xxhash.xxh32_hexdigest(d)

            d = await cache.get_bars_raw_data(code, end_time, 8, FrameType.MIN30)
            if d:
                checksum[f"{FrameType.MIN30.value}"] = xxhash.xxh32_hexdigest(d)

            d = await cache.get_bars_raw_data(code, end_time, 4, FrameType.MIN60)
            if d:
                checksum[f"{FrameType.MIN60.value}"] = xxhash.xxh32_hexdigest(d)

            checksums[code] = checksum
        except Exception as e:
            logger.exception(e)

        if (i + 1) % 500 == 0:
            logger.info("calc checksum progress: %s/%s", i + 1, len(codes))

    return checksums
do_validation(secs=None, start=None, end=None) async

对列表secs中指定的证券行情数据按start到end指定的时间范围进行校验

Parameters:

Name Type Description Default
secs List[str]

[description]. Defaults to None.

None
start str

[description]. Defaults to None.

None
end str

[description]. Defaults to None.

None

Returns:

Type Description
[type]

[description]

Source code in omega/core/sanity.py
async def do_validation(secs: List[str] = None, start: str = None, end: str = None):
    """对列表secs中指定的证券行情数据按start到end指定的时间范围进行校验

    Args:
        secs (List[str], optional): [description]. Defaults to None.
        start (str, optional): [description]. Defaults to None.
        end (str, optional): [description]. Defaults to None.

    Returns:
        [type]: [description]
    """
    logger.info("start validation...")
    report = logging.getLogger("validation_report")

    cfg = cfg4py.init(get_config_dir(), False)

    await emit.start(engine=emit.Engine.REDIS, dsn=cfg.redis.dsn, start_server=True)
    await omicron.init()
    start = int(start or await cache.sys.get("jobs.bars_validation.range.start"))
    if end is None:
        end = tf.date2int(arrow.now().date())
    else:
        end = int(end or await cache.sys.get("jobs.bars_validation.range.stop"))

    if secs is None:

        async def get_sec():
            return await cache.sys.lpop("jobs.bars_validation.scope")

    else:

        async def get_sec():
            return secs.pop() if len(secs) else None

    errors = 0
    while code := await get_sec():
        try:
            for day in tf.day_frames[(tf.day_frames >= start) & (tf.day_frames <= end)]:
                expected = await get_checksum(day)
                if expected and expected.get(code):
                    actual = await calc_checksums(tf.int2date(day), [code])
                    d1 = actual.get(code)
                    d2 = expected.get(code)

                    missing1 = d2.keys() - d1  # local has no checksum
                    missing2 = d1.keys() - d2  # remote has no checksum
                    mismatch = {k for k in d1.keys() & d2 if d1[k] != d2[k]}

                    for k in missing1:
                        info = (
                            ValidationError.LOCAL_MISS,
                            day,
                            code,
                            k,
                            d1.get(k),
                            d2.get(k),
                        )
                        report.info("%s,%s,%s,%s,%s,%s", *info)
                        await emit.emit(Events.OMEGA_VALIDATION_ERROR, info)
                    for k in missing2:
                        info = (
                            ValidationError.REMOTE_MISS,
                            day,
                            code,
                            k,
                            d1.get(k),
                            d2.get(k),
                        )
                        report.info("%s,%s,%s,%s,%s,%s", *info)
                        await emit.emit(Events.OMEGA_VALIDATION_ERROR, info)
                    for k in mismatch:
                        info = (
                            ValidationError.MISMATCH,
                            day,
                            code,
                            k,
                            d1.get(k),
                            d2.get(k),
                        )
                        report.info("%s,%s,%s,%s,%s,%s", *info)
                        await emit.emit(Events.OMEGA_VALIDATION_ERROR, info)

                else:
                    logger.error("checksum for %s not found.", day)
                    info = (ValidationError.NO_CHECKSUM, day, None, None, None, None)
                    report.info("%s,%s,%s,%s,%s,%s", *info)
                    await emit.emit(Events.OMEGA_VALIDATION_ERROR, info)
        except Exception as e:
            logger.exception(e)
            errors += 1

    await emit.emit(Events.OMEGA_VALIDATION_ERROR, (ValidationError.UNKNOWN, errors))
    logger.warning("do_validation meet %s unknown errors", errors)
on_validation_error(report) async

Parameters:

Name Type Description Default
report tuple

object like ::(reason, day, code, frame, local, remote)

required
Source code in omega/core/sanity.py
async def on_validation_error(report: tuple):
    """
    Args:
        report: object like ::(reason, day, code, frame, local, remote)

    Returns:

    """
    global validation_errors, no_validation_error_days

    # todo: raise no checksum issue
    if report[0] == ValidationError.UNKNOWN:
        no_validation_error_days = set()
    else:
        validation_errors.append(report)
        if report[1] is not None:
            no_validation_error_days -= {report[1]}
start_validation() async

将待校验的证券按CPU个数均匀划分,创建与CPU个数相同的子进程来执行校验。校验的起始时间由数据 库中jobs.bars_validation.range.start和jobs.bars_validation.range.stop来决定,每次校验 结束后,将jobs.bars_validation.range.start更新为校验截止的最后交易日。如果各个子进程报告 的截止交易日不一样(比如发生了异常),则使用最小的交易日。

Source code in omega/core/sanity.py
async def start_validation():
    """
    将待校验的证券按CPU个数均匀划分,创建与CPU个数相同的子进程来执行校验。校验的起始时间由数据
    库中jobs.bars_validation.range.start和jobs.bars_validation.range.stop来决定,每次校验
    结束后,将jobs.bars_validation.range.start更新为校验截止的最后交易日。如果各个子进程报告
    的截止交易日不一样(比如发生了异常),则使用最小的交易日。
    """
    global validation_errors, no_validation_error_days
    validation_errors = []

    secs = Securities()

    cpu_count = psutil.cpu_count()

    # to check if the range is right
    pl = cache.sys.pipeline()
    pl.get("jobs.bars_validation.range.start")
    pl.get("jobs.bars_validation.range.end")
    start, end = await pl.execute()

    if start is None:
        if cfg.omega.validation.start is None:
            logger.warning("start of validation is not specified, validation aborted.")
            return
        else:
            start = tf.date2int(arrow.get(cfg.omega.validation.start))
    else:
        start = int(start)

    if end is None:
        end = tf.date2int(tf.floor(arrow.now().date(), FrameType.DAY))
    else:
        end = int(end)

    assert start <= end

    no_validation_error_days = set(
        tf.day_frames[(tf.day_frames >= start) & (tf.day_frames <= end)]
    )

    # fixme: do validation per frame_type
    # fixme: test fail. Rewrite this before 0.6 releases
    codes = secs.choose(cfg.omega.sync)
    await cache.sys.delete("jobs.bars_validation.scope")
    await cache.sys.lpush("jobs.bars_validation.scope", *codes)

    logger.info("start validation %s secs from %s to %s.", len(codes), start, end)
    emit.register(Events.OMEGA_VALIDATION_ERROR, on_validation_error)

    t0 = time.time()

    code = (
        "from omega.core.sanity import do_validation_process_entry; "
        "do_validation_process_entry()"
    )

    procs = []
    for i in range(cpu_count):
        proc = subprocess.Popen([sys.executable, "-c", code], env=os.environ)
        procs.append(proc)

    timeout = 3600
    while timeout > 0:
        await asyncio.sleep(2)
        timeout -= 2
        for proc in procs:
            proc.poll()

        if all([proc.returncode is not None for proc in procs]):
            break

    if timeout <= 0:
        for proc in procs:
            try:
                os.kill(proc.pid, signal.SIGTERM)
            except Exception:
                pass

    # set next start point
    validation_days = set(
        tf.day_frames[(tf.day_frames >= start) & (tf.day_frames <= end)]
    )
    diff = validation_days - no_validation_error_days
    if len(diff):
        last_no_error_day = min(diff)
    else:
        last_no_error_day = end

    await cache.sys.set("jobs.bars_validation.range.start", last_no_error_day)
    elapsed = time.time() - t0
    logger.info(
        "Validation cost %s seconds, validation will start at %s next time",
        elapsed,
        last_no_error_day,
    )

fetcher special

quotes fetcher

abstract_quotes_fetcher

This is a awesome python script!

AbstractQuotesFetcher
get_all_trade_days() async classmethod

返回交易日历。不同的服务器可能返回的时间跨度不一样,但相同跨度内的时间应该一样。对已 经过去的交易日,可以用上证指数来验证。

Source code in omega/fetcher/abstract_quotes_fetcher.py
@classmethod
async def get_all_trade_days(cls):
    days = await cls.get_instance().get_all_trade_days()
    await cache.save_calendar("day_frames", map(tf.date2int, days))
    return days
get_bars(sec, end, n_bars, frame_type, include_unclosed=True) async classmethod

获取行情数据,并将已结束的周期数据存入缓存。

各种情况: 1. 假设现在时间是2021-2-24日,盘中。此时请求上证指数日线,且include_unclosedTrue

get_bars("000001.XSHE", None, 1, FrameType.DAY)
得到的数据可能如下:
[(datetime.date(2021, 2, 24), 3638.9358, 3645.5288, 3617.44, 3620.3542, ...)]
在收盘前不同时间调用,得到的数据除开盘价外,其它都实时在变动。

  1. 假设现在时间是2021-2-23日,盘后,此时请求上证指数日线,将得到收盘后固定的价格。

  2. 上述请求中,include_unclosed参数使用默认值(True)。如果取为False,仍以示例1 指定的场景为例,则:

    get_bars("000001.XSHG", None, 1, FrameType.DAY, False)
    
    因为2021-2-24日未收盘,所以获取的最后一条数据是2021-2-23日的。

  3. 同样假设现在时间是2021-2-24日盘中,周三。此时获取周K线。在include_unclosed分别为 TrueFalse的情况下:

    [(datetime.date(2021, 2, 24), 3707.19, 3717.27, 3591.3647, 3592.3977, ...)]
    [(datetime.date(2021, 2, 19), 3721.09, 3731.69, 3634.01, 3696.17, ...)]
    
    注意这里当include_unclosed为True时,返回的周K线是以2021-2-24为Frame的。同样,在盘中 的不同时间取这个数据,除了open数值之外,其它都是实时变化的。

  4. 如果在已结束的周期中,包含停牌数据,则会对停牌期间的数据进行nan填充,以方便数据使用 者可以较容易地分辨出数据不连贯的原因:哪些是停牌造成的,哪些是非交易日造成的。这种处理 会略微降低数据获取速度,并增加存储空间。

比如下面的请求:

get_bars("000029.XSHE", datetime.date(2020,8,18), 10, FrameType.DAY)
将获取到2020-8-5到2020-8-18间共10条数据。但由于期间000029这支股票处于停牌期,所以返回 的10条数据中,数值部分全部填充为np.nan。

注意如果取周线和月线数据,如果当天停牌,但只要周线有数据,则仍能取到。周线(或者月线)的 frame将是停牌前一交易日。比如,

sec = "600721.XSHG"
frame_type = FrameType.WEEK

end = arrow.get("2020-4-29 15:00").datetime
bars = await aq.get_bars(sec, end, 3, FrameType.WEEK)
print(bars)
2020年4月30日是该周的最后一个交易日。股票600721在4月29日停牌一天。上述请求将得到如下数 据:
[(datetime.date(2020, 4, 17), 6.02, 6.69, 5.84, 6.58, ...)
 (datetime.date(2020, 4, 24), 6.51, 6.57, 5.68, 5.72, ...)
 (datetime.date(2020, 4, 28), 5.7, 5.71, 5.17, 5.36, ...)]
停牌发生在日线级别上,但我们的请求发生在周线级别上,所以不会对4/29日进行填充,而是返回 截止到4月29日的数据。

Parameters:

Name Type Description Default
sec str

证券代码

required
end Union[datetime.date, datetime.datetime]

数据截止日

required
n_bars int

待获取的数据条数

required
frame_type FrameType

数据所属的周期

required
include_unclosed

如果为真,则会包含当end所处的那个Frame的数据,即使当前它还未结束

True
Source code in omega/fetcher/abstract_quotes_fetcher.py
@classmethod
async def get_bars(
    cls,
    sec: str,
    end: Frame,
    n_bars: int,
    frame_type: FrameType,
    include_unclosed=True,
) -> np.ndarray:
    """获取行情数据,并将已结束的周期数据存入缓存。

    各种情况:
    1. 假设现在时间是2021-2-24日,盘中。此时请求上证指数日线,且`include_unclosed`为
    `True`:
    ```python
    get_bars("000001.XSHE", None, 1, FrameType.DAY)
    ```
    得到的数据可能如下:
    ```
    [(datetime.date(2021, 2, 24), 3638.9358, 3645.5288, 3617.44, 3620.3542, ...)]
    ```
    在收盘前不同时间调用,得到的数据除开盘价外,其它都实时在变动。

    2. 假设现在时间是2021-2-23日,盘后,此时请求上证指数日线,将得到收盘后固定的价格。

    3. 上述请求中,`include_unclosed`参数使用默认值(`True`)。如果取为`False`,仍以示例1
    指定的场景为例,则:
    ```python
    get_bars("000001.XSHG", None, 1, FrameType.DAY, False)
    ```
    因为2021-2-24日未收盘,所以获取的最后一条数据是2021-2-23日的。

    4. 同样假设现在时间是2021-2-24日盘中,周三。此时获取周K线。在`include_unclosed`分别为
    `True`和`False`的情况下:
    ```
    [(datetime.date(2021, 2, 24), 3707.19, 3717.27, 3591.3647, 3592.3977, ...)]
    [(datetime.date(2021, 2, 19), 3721.09, 3731.69, 3634.01, 3696.17, ...)]
    ```
    注意这里当`include_unclosed`为True时,返回的周K线是以2021-2-24为Frame的。同样,在盘中
    的不同时间取这个数据,除了`open`数值之外,其它都是实时变化的。

    5. 如果在已结束的周期中,包含停牌数据,则会对停牌期间的数据进行nan填充,以方便数据使用
    者可以较容易地分辨出数据不连贯的原因:哪些是停牌造成的,哪些是非交易日造成的。这种处理
    会略微降低数据获取速度,并增加存储空间。

    比如下面的请求:
    ```python
    get_bars("000029.XSHE", datetime.date(2020,8,18), 10, FrameType.DAY)
    ```
    将获取到2020-8-5到2020-8-18间共10条数据。但由于期间000029这支股票处于停牌期,所以返回
    的10条数据中,数值部分全部填充为np.nan。

    注意如果取周线和月线数据,如果当天停牌,但只要周线有数据,则仍能取到。周线(或者月线)的
    `frame`将是停牌前一交易日。比如,
    ```python
    sec = "600721.XSHG"
    frame_type = FrameType.WEEK

    end = arrow.get("2020-4-29 15:00").datetime
    bars = await aq.get_bars(sec, end, 3, FrameType.WEEK)
    print(bars)
    ```
    2020年4月30日是该周的最后一个交易日。股票600721在4月29日停牌一天。上述请求将得到如下数
    据:
    ```
    [(datetime.date(2020, 4, 17), 6.02, 6.69, 5.84, 6.58, ...)
     (datetime.date(2020, 4, 24), 6.51, 6.57, 5.68, 5.72, ...)
     (datetime.date(2020, 4, 28), 5.7, 5.71, 5.17, 5.36, ...)]
    ```
    停牌发生在日线级别上,但我们的请求发生在周线级别上,所以不会对4/29日进行填充,而是返回
    截止到4月29日的数据。

    args:
        sec: 证券代码
        end: 数据截止日
        n_bars: 待获取的数据条数
        frame_type: 数据所属的周期
        include_unclosed: 如果为真,则会包含当end所处的那个Frame的数据,即使当前它还未结束
    """
    now = arrow.now(tz=cfg.tz)
    end = end or now.datetime

    # 如果end超出当前时间,则认为是不合法的。如果用户想取到最新的数据,应该传入None
    if type(end) == datetime.date:
        if end > now.date():
            return None
    elif type(end) == datetime.datetime:
        if end > now:
            return None

    bars = await cls.get_instance().get_bars(
        sec, end, n_bars, frame_type.value, include_unclosed
    )

    if len(bars) == 0:
        return

    # 根据指定的end,计算结束时的frame
    last_closed_frame = tf.floor(end, frame_type)

    last_frame = bars[-1]["frame"]

    # 计算有多少根k线是已结束的
    n_closed = n_bars - 1

    if frame_type == FrameType.DAY:
        # 盘后取日线,返回的一定是全部都已closed的数据
        # 盘中取日线,返回的last_frame会是当天的日期,但该日线并未结束
        if now.datetime.hour >= 15 or last_frame < now.date():
            n_closed = n_bars
    else:
        # 如果last_frame <= end的上限,则返回的也一定是全部都closed的数据
        if last_frame <= tf.floor(end, frame_type):
            n_closed = n_bars

    remainder = [bars[-1]] if n_closed < n_bars else None

    closed_bars = cls._fill_na(bars, n_closed, last_closed_frame, frame_type)

    # 只保存已结束的bar
    await cache.save_bars(sec, closed_bars, frame_type)
    if remainder is None:
        return closed_bars
    else:
        return np.concatenate([closed_bars, remainder])
get_security_list() async classmethod

按如下格式返回证券列表。

code display_name name start_date end_date type 000001.XSHE 平安银行 PAYH 1991-04-03 2200-01-01 stock

Returns:

Type Description
Union[NoneType, numpy.ndarray]

Union[None, np.ndarray]: [description]

Source code in omega/fetcher/abstract_quotes_fetcher.py
@classmethod
async def get_security_list(cls) -> Union[None, np.ndarray]:
    """按如下格式返回证券列表。

    code         display_name   name   start_date   end_date   type
    000001.XSHE   平安银行       PAYH   1991-04-03   2200-01-01 stock

    Returns:
        Union[None, np.ndarray]: [description]
    """
    securities = await cls.get_instance().get_security_list()
    if securities is None or len(securities) == 0:
        logger.warning("failed to update securities. %s is returned.", securities)
        return securities

    key = "securities"
    pipeline = cache.security.pipeline()
    pipeline.delete(key)
    for code, display_name, name, start, end, _type in securities:
        pipeline.rpush(
            key, f"{code},{display_name},{name},{start}," f"{end},{_type}"
        )
    await pipeline.execute()
    return securities
get_valuation(code, day, fields=None, n=1) async classmethod

读取code指定的股票在date指定日期的市值数据。

返回数据包括: code: 股票代码 day: 日期 captialization: 总股本 circulating_cap: 流通股本(万股) market_cap: 总市值(亿元) circulating_market_cap: 流通市值(亿元) turnover_ration: 换手率(%) pe_ratio: 市盈率(PE,TTM)每股市价为每股收益的倍数,反映投资人对每元净利润所愿支付的价 格,用来估计股票的投资报酬和风险 pe_ratio_lyr: 市盈率(PE),以上一年度每股盈利计算的静态市盈率. 股价/最近年度报告EPS pb_ratio: 市净率(PB) ps_ratio: 市销率(PS) pcf_ratio: 市现率(PCF)

Parameters:

Name Type Description Default
code Union[str, List[str]]

[description]

required
day date

[description]

required

Returns:

Type Description
ndarray

numpy.ndarray: [description]

Source code in omega/fetcher/abstract_quotes_fetcher.py
@classmethod
async def get_valuation(
    cls,
    code: Union[str, List[str]],
    day: datetime.date,
    fields: List[str] = None,
    n: int = 1,
) -> np.ndarray:

    valuation = await cls.get_instance().get_valuation(code, day, n)

    await Valuation.save(valuation)

    if fields is None:
        return valuation

    if isinstance(fields, str):
        fields = [fields]

    mapping = dict(valuation.dtype.descr)
    fields = [(name, mapping[name]) for name in fields]
    return rfn.require_fields(valuation, fields)

archive

adjust_range(batch=500) async

adjust secs's range after archive bars imported

Source code in omega/fetcher/archive.py
async def adjust_range(batch: int = 500):
    """adjust secs's range after archive bars imported"""
    cur = b"0"
    key = "archive.ranges.*"
    logger.info("start adjust range")
    while cur:
        cur, keys = await cache.sys.scan(cur, match=key, count=batch)
        if not keys:
            continue

        pl = cache.security.pipeline()
        for item in keys:
            try:
                values = [int(v) for v in await cache.sys.lrange(item, 0, -1)]
                values.sort()

                arc_head, arc_tail = values[0], values[-1]

                code_frame_key = item.replace("archive.ranges.", "")
                head, tail = await cache.security.hmget(code_frame_key, "head", "tail")

                head = int(head) if head is not None else None
                tail = int(tail) if tail is not None else None

                # head, tail, arc_head, arc_tail should be all frame-aligned
                if head is None or tail is None:
                    head, tail = arc_head, arc_tail
                elif arc_tail < head or arc_head > tail:
                    head, tail = arc_head, arc_tail
                else:
                    head = min(arc_head, head)
                    tail = max(arc_tail, tail)
                pl.hset(code_frame_key, "head", head)
                pl.hset(code_frame_key, "tail", tail)
            except Exception as e:
                logger.exception(e)
                logger.warning("failed to set range for %s", code_frame_key)

        await pl.execute()
clear_range() async

clear cached secs's range before/after import archive bars

Source code in omega/fetcher/archive.py
async def clear_range():
    """clear cached secs's range before/after import archive bars"""
    key = "archive.ranges.*"
    keys = await cache.sys.keys(key)

    if keys:
        await cache.sys.delete(*keys)
main(months, cats, archive_server=None)

允许将本模块以独立进程运行,以支持多进程

Parameters:

Name Type Description Default
months str

逗号分隔的月列表。格式如202012

required
cats str

逗号分隔的类别列表,如"stock,index"

required
Source code in omega/fetcher/archive.py
def main(months: str, cats: str, archive_server: str = None):
    """允许将本模块以独立进程运行,以支持多进程

    Args:
        months (str): 逗号分隔的月列表。格式如202012
        cats (str): 逗号分隔的类别列表,如"stock,index"
    """
    config_dir = get_config_dir()
    cfg = cfg4py.init(config_dir, False)

    if archive_server:
        cfg.omega.urls.archive = archive_server

    months = str(months)
    months = [int(x) for x in months.split(",") if x]
    cats = [x for x in cats.split(",")]

    asyncio.run(_main(months, cats))

quotes_fetcher

Interface for quotes fetcher

QuotesFetcher
get_all_trade_days(self) async

返回交易日历。不同的服务器可能返回的时间跨度不一样,但相同跨度内的时间应该一样。对已 经过去的交易日,可以用上证指数来验证。

Source code in omega/fetcher/quotes_fetcher.py
async def get_all_trade_days(self):
    """
    返回交易日历。不同的服务器可能返回的时间跨度不一样,但相同跨度内的时间应该一样。对已
    经过去的交易日,可以用上证指数来验证。
    """
    raise NotImplementedError
get_bars(self, sec, end, n_bars, frame_type, allow_unclosed=True) async

取n个单位的k线数据。

k线周期由frame_type指定。最后结束周期为end。股票停牌期间的数据会使用None填充。

Parameters:

Name Type Description Default
sec str

证券代码

required
end Union[datetime.date, datetime.datetime] required
n_bars int required
frame_type FrameType required
allow_unclosed bool

为真时,当前未结束的帧数据也获取

True

Returns:

Type Description
ndarray

a numpy.ndarray, with each element is: 'frame': datetime.date or datetime.datetime, depends on frame_type. Denotes which time frame the data belongs . 'open, high, low, close': float 'volume': double 'amount': the buy/sell amount in total, double 'factor': float, may exist or not

Source code in omega/fetcher/quotes_fetcher.py
async def get_bars(
    self,
    sec: str,
    end: Frame,
    n_bars: int,
    frame_type: FrameType,
    allow_unclosed=True,
) -> numpy.ndarray:
    """取n个单位的k线数据。

    k线周期由frame_type指定。最后结束周期为end。股票停牌期间的数据会使用None填充。
    Args:
        sec (str): 证券代码
        end (Frame):
        n_bars (int):
        frame_type (FrameType):
        allow_unclosed (bool): 为真时,当前未结束的帧数据也获取

    Returns:
        a numpy.ndarray, with each element is:
        'frame': datetime.date or datetime.datetime, depends on frame_type.
        Denotes which time frame the data
        belongs .
        'open, high, low, close': float
        'volume': double
        'amount': the buy/sell amount in total, double
        'factor': float, may exist or not
    """
    raise NotImplementedError
get_security_list(self) async

fetch security list from server. The returned list is a numpy.ndarray, which each elements should look like: code display_name name start_date end_date type 000001.XSHE 平安银行 PAYH 1991-04-03 2200-01-01 stock 000002.XSHE 万科A WKA 1991-01-29 2200-01-01 stock

all fields are string type

Returns:

Type Description
ndarray
Source code in omega/fetcher/quotes_fetcher.py
async def get_security_list(self) -> numpy.ndarray:
    """
    fetch security list from server. The returned list is a numpy.ndarray,
    which each elements
    should look like:
    code         display_name name  start_date  end_date     type
    000001.XSHE  平安银行      PAYH  1991-04-03  2200-01-01   stock
    000002.XSHE   万科A        WKA   1991-01-29  2200-01-01   stock

    all fields are string type
    Returns:

    """
    raise NotImplementedError
get_valuation(self, code, day) async

读取code指定的股票在date指定日期的市值数据。

返回数据包括: code: 股票代码 day: 日期 captialization: 总股本 circulating_cap: 流通股本(万股) market_cap: 总市值(亿元) circulating_market_cap: 流通市值(亿元) turnover_ration: 换手率(%) pe_ratio: 市盈率(PE,TTM)每股市价为每股收益的倍数,反映投资人对每元净利润所愿支付的价 格,用来估计股票的投资报酬和风险 pe_ratio_lyr: 市盈率(PE),以上一年度每股盈利计算的静态市盈率. 股价/最近年度报告EPS pb_ratio: 市净率(PB) ps_ratio: 市销率(PS) pcf_ratio: 市现率(PCF)

Parameters:

Name Type Description Default
code Union[str, List[str]]

[description]

required
day Union[datetime.date, datetime.datetime]

[description]

required

Returns:

Type Description
ndarray

numpy.ndarray: [description]

Source code in omega/fetcher/quotes_fetcher.py
async def get_valuation(
    self, code: Union[str, List[str]], day: Frame
) -> numpy.ndarray:
    """读取code指定的股票在date指定日期的市值数据。

    返回数据包括:
        code: 股票代码
        day: 日期
        captialization: 总股本
        circulating_cap: 流通股本(万股)
        market_cap: 总市值(亿元)
        circulating_market_cap: 流通市值(亿元)
        turnover_ration: 换手率(%)
        pe_ratio: 市盈率(PE,TTM)每股市价为每股收益的倍数,反映投资人对每元净利润所愿支付的价
        格,用来估计股票的投资报酬和风险
        pe_ratio_lyr: 市盈率(PE),以上一年度每股盈利计算的静态市盈率. 股价/最近年度报告EPS
        pb_ratio: 市净率(PB)
        ps_ratio: 市销率(PS)
        pcf_ratio: 市现率(PCF)

    Args:
        code (Union[str, List[str]]): [description]
        day (Frame): [description]

    Returns:
        numpy.ndarray: [description]
    """
    raise NotImplementedError

jobs special

syncjobs

load_sync_params(frame_type)

根据指定的frame_type,从配置文件中加载同步参数

Parameters:

Name Type Description Default
frame_type FrameType

[description]

required

Returns:

Type Description
dict

dict: see @[omega.jobs.syncjobs.parse_sync_params]

Source code in omega/jobs/syncjobs.py
def load_sync_params(frame_type: FrameType) -> dict:
    """根据指定的frame_type,从配置文件中加载同步参数

    Args:
        frame_type (FrameType): [description]

    Returns:
        dict: see @[omega.jobs.syncjobs.parse_sync_params]
    """
    for item in cfg.omega.sync.bars:
        if item.get("frame") == frame_type.value:
            try:
                secs, frame_type, start, stop, delay = parse_sync_params(**item)
                return item
            except Exception as e:
                logger.exception(e)
                logger.warning("failed to parse %s", item)
                return None

    return None
parse_sync_params(frame, cat=None, start=None, stop=None, delay=0, include='', exclude='')

按照使用手册中的规则,解析和补全同步参数。

如果frame_type为分钟级,则当start指定为date类型时,自动更正为对应交易日的起始帧; 当stopdate类型时,自动更正为对应交易日的最后一帧。

Parameters:

Name Type Description Default
frame Union[str, datetime.date, datetime.datetime]

frame type to be sync. The word frame is used here for easy understand by end user. It actually implies "FrameType".

required
cat List[str]

which catetories is about to be synced. Should be one of ['stock', 'index']. Defaults to None.

None
start Union[str, datetime.date]

[description]. Defaults to None.

None
stop Union[str, datetime.date, datetime.datetime]

[description]. Defaults to None.

None
delay int

[description]. Defaults to 5.

0
include str

which securities should be included, seperated by space, for example, "000001.XSHE 000004.XSHE". Defaults to empty string.

''
exclude str

which securities should be excluded, seperated by a space. Defaults to empty string.

''

Returns:

Type Description
Tuple
  • codes (List[str]): 待同步证券列表
  • frame_type (FrameType):
  • start (Frame):
  • stop (Frame):
  • delay (int):
Source code in omega/jobs/syncjobs.py
def parse_sync_params(
    frame: Union[str, Frame],
    cat: List[str] = None,
    start: Union[str, datetime.date] = None,
    stop: Union[str, Frame] = None,
    delay: int = 0,
    include: str = "",
    exclude: str = "",
) -> Tuple:
    """按照[使用手册](usage.md#22-如何同步K线数据)中的规则,解析和补全同步参数。

    如果`frame_type`为分钟级,则当`start`指定为`date`类型时,自动更正为对应交易日的起始帧;
    当`stop`为`date`类型时,自动更正为对应交易日的最后一帧。
    Args:
        frame (Union[str, Frame]): frame type to be sync.  The word ``frame`` is used
            here for easy understand by end user. It actually implies "FrameType".
        cat (List[str]): which catetories is about to be synced. Should be one of
            ['stock', 'index']. Defaults to None.
        start (Union[str, datetime.date], optional): [description]. Defaults to None.
        stop (Union[str, Frame], optional): [description]. Defaults to None.
        delay (int, optional): [description]. Defaults to 5.
        include (str, optional): which securities should be included, seperated by
            space, for example, "000001.XSHE 000004.XSHE". Defaults to empty string.
        exclude (str, optional):  which securities should be excluded, seperated by
            a space. Defaults to empty string.

    Returns:
        - codes (List[str]): 待同步证券列表
        - frame_type (FrameType):
        - start (Frame):
        - stop (Frame):
        - delay (int):
    """
    frame_type = FrameType(frame)

    if frame_type in tf.minute_level_frames:
        if stop:
            stop = arrow.get(stop, tzinfo=cfg.tz)
            if stop.hour == 0:  # 未指定有效的时间帧,使用当日结束帧
                stop = tf.last_min_frame(tf.day_shift(stop.date(), 0), frame_type)
            else:
                stop = tf.floor(stop, frame_type)
        else:
            stop = tf.floor(arrow.now(tz=cfg.tz).datetime, frame_type)

        if stop > arrow.now(tz=cfg.tz):
            raise ValueError(f"请勿将同步截止时间设置在未来: {stop}")

        if start:
            start = arrow.get(start, tzinfo=cfg.tz)
            if start.hour == 0:  # 未指定有效的交易帧,使用当日的起始帧
                start = tf.first_min_frame(tf.day_shift(start.date(), 0), frame_type)
            else:
                start = tf.floor(start, frame_type)
        else:
            start = tf.shift(stop, -999, frame_type)
    else:
        stop = (stop and arrow.get(stop).date()) or arrow.now().date()
        if stop == arrow.now().date():
            stop = arrow.now(tz=cfg.tz)

        stop = tf.floor(stop, frame_type)
        start = tf.floor((start and arrow.get(start).date()), frame_type) or tf.shift(
            stop, -1000, frame_type
        )

    secs = Securities()
    codes = secs.choose(cat or [])

    exclude = map(lambda x: x, exclude.split(" "))
    codes = list(set(codes) - set(exclude))

    include = list(filter(lambda x: x, include.split(" ")))
    codes.extend(include)

    return codes, frame_type, start, stop, int(delay)
sync_bars(params) async

sync bars on signal OMEGA_DO_SYNC received

Parameters:

Name Type Description Default
params dict

composed of the following:

{
    secs (List[str]): 待同步的证券标的.如果为None或者为空,则从数据库中轮询
    frame_type (FrameType):k线的帧类型
    start (Frame): k线起始时间
    stop (Frame): k线结束时间
}

required

Returns:

Type Description
[type]

[description]

Source code in omega/jobs/syncjobs.py
async def sync_bars(params: dict):
    """sync bars on signal OMEGA_DO_SYNC received

    Args:
        params (dict): composed of the following:
            ```
            {
                secs (List[str]): 待同步的证券标的.如果为None或者为空,则从数据库中轮询
                frame_type (FrameType):k线的帧类型
                start (Frame): k线起始时间
                stop (Frame): k线结束时间
            }
            ```
    Returns:
        [type]: [description]
    """
    secs, frame_type, start, stop = (
        params.get("secs"),
        params.get("frame_type"),
        params.get("start"),
        params.get("stop"),
    )

    if secs is not None:
        logger.info(
            "sync bars with %s(%s ~ %s) for given %s secs",
            frame_type,
            start,
            stop,
            len(secs),
        )

        async def get_sec():
            return secs.pop() if len(secs) else None

    else:
        logger.info(
            "sync bars with %s(%s ~ %s) in polling mode", frame_type, start, stop
        )

        async def get_sec():
            return await cache.sys.lpop(key_scope)

    key_scope = f"jobs.bars_sync.scope.{frame_type.value}"

    if start is None or frame_type is None:
        raise ValueError("you must specify a start date/frame_type for sync")

    if stop is None:
        stop = tf.floor(arrow.now(tz=cfg.tz), frame_type)

    while code := await get_sec():
        try:
            await sync_bars_for_security(code, frame_type, start, stop)
        except FetcherQuotaError as e:
            logger.warning("Quota exceeded when syncing %s. Sync aborted.", code)
            logger.exception(e)
            return  # stop the sync
        except Exception as e:
            logger.warning("Failed to sync %s", code)
            logger.exception(e)

    elapsed = await _stop_job_timer("sync")
    logger.info("%s finished quotes sync in %s seconds", os.getpid(), elapsed)
sync_calendar() async

从上游服务器获取所有交易日,并计算出周线帧和月线帧

Source code in omega/jobs/syncjobs.py
async def sync_calendar():
    """从上游服务器获取所有交易日,并计算出周线帧和月线帧

    Returns:
    """
    trade_days = await aq.get_all_trade_days()
    if trade_days is None or len(trade_days) == 0:
        logger.warning("failed to fetch trade days.")
        return None

    tf.day_frames = [tf.date2int(x) for x in trade_days]
    weeks = []
    last = trade_days[0]
    for cur in trade_days:
        if cur.weekday() < last.weekday() or (cur - last).days >= 7:
            weeks.append(last)
        last = cur

    if weeks[-1] < last:
        weeks.append(last)

    tf.week_frames = [tf.date2int(x) for x in weeks]
    await cache.save_calendar("week_frames", map(tf.date2int, weeks))

    months = []
    last = trade_days[0]
    for cur in trade_days:
        if cur.day < last.day:
            months.append(last)
        last = cur
    months.append(last)

    tf.month_frames = [tf.date2int(x) for x in months]
    await cache.save_calendar("month_frames", map(tf.date2int, months))
    logger.info("trade_days is updated to %s", trade_days[-1])
sync_security_list() async

更新证券列表

注意证券列表在AbstractQuotesServer取得时就已保存,此处只是触发

Source code in omega/jobs/syncjobs.py
async def sync_security_list():
    """更新证券列表

    注意证券列表在AbstractQuotesServer取得时就已保存,此处只是触发
    """
    secs = await aq.get_security_list()
    logger.info("%s secs are fetched and saved.", len(secs))
trigger_bars_sync(sync_params=None, force=False) async

初始化bars_sync的任务,发信号给各quotes_fetcher进程以启动同步。

Parameters:

Name Type Description Default
frame_type FrameType

要同步的帧类型

required
sync_params dict

同步参数

{
    start: 起始帧
    stop: 截止帧
    frame: 帧类型
    delay: 延迟启动时间,以秒为单位
    cat: 证券分类,如stock, index等
    delay: seconds for sync to wait.
}
see more @omega.jobs.syncjobs.parse_sync_params

None
force

即使当前不是交易日,是否也强行进行同步。

False
Source code in omega/jobs/syncjobs.py
async def trigger_bars_sync(sync_params: dict = None, force=False):
    """初始化bars_sync的任务,发信号给各quotes_fetcher进程以启动同步。

    Args:
        frame_type (FrameType): 要同步的帧类型
        sync_params (dict): 同步参数
            ```
            {
                start: 起始帧
                stop: 截止帧
                frame: 帧类型
                delay: 延迟启动时间,以秒为单位
                cat: 证券分类,如stock, index等
                delay: seconds for sync to wait.
            }
            ```
            see more @[omega.jobs.syncjobs.parse_sync_params][]
        force: 即使当前不是交易日,是否也强行进行同步。
    Returns:

    """
    if not force and not tf.is_trade_day(arrow.now()):
        return

    codes, frame_type, start, stop, delay = parse_sync_params(**sync_params)
    key_scope = f"jobs.bars_sync.scope.{frame_type.value}"

    if len(codes) == 0:
        logger.warning("no securities are specified for sync %s", frame_type)
        return

    fmt_str = "sync from %s to %s in frame_type(%s) for %s secs"
    logger.info(fmt_str, start, stop, frame_type, len(codes))

    # secs are stored into cache, so each fetcher can polling it
    pl = cache.sys.pipeline()
    pl.delete(key_scope)
    pl.lpush(key_scope, *codes)
    await pl.execute()

    await asyncio.sleep(delay)
    await _start_job_timer("sync")

    await emit.emit(
        Events.OMEGA_DO_SYNC, {"frame_type": frame_type, "start": start, "stop": stop}
    )

    fmt_str = "send trigger sync event to fetchers: from %s to %s in frame_type(%s) for %s secs"
    logger.info(fmt_str, start, stop, frame_type, len(codes))
trigger_single_worker_sync(_type, params=None) async

启动只需要单个quotes fetcher进程来完成的数据同步任务

比如交易日历、证券列表等如果需要同时启动多个quotes fetcher进程来完成数据同步任务,应该通过 pyemit来发送广播消息。

Parameters:

Name Type Description Default
_type str

the type of data to be synced, either calendar or ecurity_list

required
Source code in omega/jobs/syncjobs.py
async def trigger_single_worker_sync(_type: str, params: dict = None):
    """启动只需要单个quotes fetcher进程来完成的数据同步任务

    比如交易日历、证券列表等如果需要同时启动多个quotes fetcher进程来完成数据同步任务,应该通过
    pyemit来发送广播消息。

    Args:
        _type: the type of data to be synced, either ``calendar`` or ``ecurity_list``
    """
    url = cfg.omega.urls.quotes_server
    if _type == "calendar":
        url += "/jobs/sync_calendar"
    elif _type == "security_list":
        url += "/jobs/sync_security_list"
    else:
        raise ValueError(f"{_type} is not supported sync type.")

    async with aiohttp.ClientSession() as client:
        try:
            async with client.post(url, data=params) as resp:
                if resp.status != 200:
                    logger.warning("failed to trigger %s sync", _type)
                else:
                    return await resp.json()
        except Exception as e:
            logger.exception(e)