自定义交易环境

TradingEnv 由三个可插拔的部分组成:

Scheme

决定

默认

ActionScheme

agent 的动作空间,以及一个动作如何转换为订单

DefaultAction:3 离散(多 / 空 / 平)

ObserverScheme

观测空间,以及 agent 每步看到什么

DefaultObserver:选定特征列的滚动窗口

RewardScheme

env.step 返回的标量奖励

DefaultReward:本步关闭交易的对数收益,扣除佣金

继承任一抽象基类,把你的实例传给 TradingEnv(...)

from qtrade.env import TradingEnv

env = TradingEnv(
    data=df,
    cash=10_000,
    action_scheme=MyActions(),
    observer_scheme=MyObservations(),
    reward_scheme=MyReward(),
)

不传则使用对应的默认实现。可以自由混搭自定义和默认 scheme。


ActionScheme

需要实现两个方法:

  • action_space —— 一个 gymnasium.spaces.Space,描述 agent 能输出什么。

  • get_orders(action, env) -> list[Order] —— 把选定的动作转换为待下的 Order 列表。返回 [] 表示「什么都不做」。

env 暴露 env.positionenv.equityenv.dataenv.current_time,如需更多可用底层的 env._broker

示例:分档仓位动作

默认 action scheme 用固定 size=1。下面这个让 agent 可以同时选择仓位档位(小 / 中 / 大)和方向:

import gymnasium as gym
from gymnasium.spaces import Space
from qtrade.env import ActionScheme
from qtrade.core import Order

class TieredAction(ActionScheme):
    """6-discrete: {flat, small_long, large_long, small_short, large_short, hold}."""

    SIZES = {"small": 5, "large": 20}

    @property
    def action_space(self) -> Space:
        return gym.spaces.Discrete(6)

    def get_orders(self, action: int, env) -> list[Order]:
        target = {
            0: 0,                            # flat
            1: +self.SIZES["small"],
            2: +self.SIZES["large"],
            3: -self.SIZES["small"],
            4: -self.SIZES["large"],
            5: env.position.size,            # hold
        }[action]
        delta = target - env.position.size
        return [Order(size=delta)] if delta != 0 else []

示例:连续(Box)动作空间

PPO / SAC 想让 agent 直接选目标仓位比例时很有用。

import numpy as np
from gymnasium.spaces import Box
from qtrade.env import ActionScheme
from qtrade.core import Order

class ContinuousAllocation(ActionScheme):
    """Single float in [-1, 1] — fraction of max size, sign = side."""

    def __init__(self, max_size: int = 100):
        self.max_size = max_size

    @property
    def action_space(self):
        return Box(low=-1.0, high=1.0, shape=(1,), dtype=np.float32)

    def get_orders(self, action, env):
        target = int(round(float(action[0]) * self.max_size))
        delta = target - env.position.size
        return [Order(size=delta)] if delta != 0 else []

ObserverScheme

需要实现两个方法:

  • observation_space —— 一个 gymnasium.spaces.Space。确保其 shape / dtypeget_observation 实际返回的匹配。

  • get_observation(env) -> np.ndarray | dict[str, np.ndarray] —— 从 env.dataenv.positionenv._broker 等取你想要的数据,形状与 observation_space 匹配。

默认 observer 返回特征列的滚动窗口。你可以继承它返回原始 OHLCV、技术指标、持仓状态,或者用 Dict 空间组合多种模态。

示例:把持仓状态加入观测

常见需求:让 agent 在看市场数据的同时知道当前仓位大小和浮动盈亏:

import numpy as np
import gymnasium as gym
from qtrade.env import ObserverScheme

class WindowedObsWithPosition(ObserverScheme):
    def __init__(self, window_size: int, features: list[str]):
        self.window_size = window_size
        self.features = features

    @property
    def observation_space(self):
        return gym.spaces.Dict({
            "market": gym.spaces.Box(
                low=-np.inf, high=np.inf,
                shape=(self.window_size, len(self.features)),
                dtype=np.float32,
            ),
            "position": gym.spaces.Box(
                low=-np.inf, high=np.inf, shape=(2,), dtype=np.float32,
            ),
        })

    def get_observation(self, env):
        market = (
            env.data[self.features]
            .iloc[-self.window_size:]
            .values.astype(np.float32)
        )
        position = np.array(
            [env.position.size, env._broker.unrealized_pnl],
            dtype=np.float32,
        )
        return {"market": market, "position": position}

Dict 观测空间在 stable-baselines3 里需要 MultiInputPolicy。大多数现成策略请坚持使用扁平的 Box

示例:用收益率代替原始价格

原始价格序列是非平稳的。大多数 RL 论文会喂对数收益率:

import numpy as np
import gymnasium as gym
from qtrade.env import ObserverScheme

class LogReturnsObserver(ObserverScheme):
    def __init__(self, window_size: int):
        self.window_size = window_size

    @property
    def observation_space(self):
        return gym.spaces.Box(
            low=-np.inf, high=np.inf,
            shape=(self.window_size,), dtype=np.float32,
        )

    def get_observation(self, env):
        closes = env.data['Close'].iloc[-(self.window_size + 1):].values
        returns = np.diff(np.log(closes)).astype(np.float32)
        return returns

RewardScheme

一个必需方法、一个可选方法:

  • get_reward(env) -> float —— 当前步的标量奖励。

  • reset() —— 可选。env.reset() 时调用。用来清除奖励函数维护的 episode 级状态。

默认奖励是本步关闭交易的对数已实现收益(扣佣金)。还有三种模式经常出现:

示例:基于权益的奖励(每步触发)

from qtrade.env import RewardScheme

class EquityChangeReward(RewardScheme):
    """Reward = change in equity since the previous step."""

    def __init__(self):
        self._prev_equity = None

    def reset(self):
        self._prev_equity = None

    def get_reward(self, env) -> float:
        equity = env._broker.equity
        if self._prev_equity is None:
            self._prev_equity = equity
            return 0.0
        delta = equity - self._prev_equity
        self._prev_equity = equity
        return float(delta)

示例:差分 Sharpe 比率

奖励风险调整后收益而非原始盈亏 —— 见 Moody & Saffell (1998):

from qtrade.env import RewardScheme

class DifferentialSharpe(RewardScheme):
    def __init__(self, eta: float = 0.01):
        self.eta = eta
        self._A = 0.0   # EMA of return
        self._B = 0.0   # EMA of squared return
        self._prev_equity = None

    def reset(self):
        self._A = self._B = 0.0
        self._prev_equity = None

    def get_reward(self, env) -> float:
        equity = env._broker.equity
        if self._prev_equity is None or self._prev_equity == 0:
            self._prev_equity = equity
            return 0.0
        r = (equity - self._prev_equity) / self._prev_equity
        self._prev_equity = equity

        dA = r - self._A
        dB = r * r - self._B
        denom = (self._B - self._A ** 2) ** 1.5
        d_sharpe = (self._B * dA - 0.5 * self._A * dB) / denom if denom > 1e-9 else 0.0

        self._A += self.eta * dA
        self._B += self.eta * dB
        return float(d_sharpe)

示例:回撤惩罚(包装另一个奖励)

一个风险感知的奖励,包装任意基础 scheme 并加上与当前回撤成比例的惩罚:

from qtrade.env import RewardScheme

class DrawdownPenalty(RewardScheme):
    def __init__(self, base: RewardScheme, penalty: float = 1.0):
        self.base = base
        self.penalty = penalty

    def reset(self):
        self.base.reset()

    def get_reward(self, env) -> float:
        eq = env._broker.equity_history.loc[:env.current_time]
        peak = eq.cummax().iloc[-1]
        drawdown = (eq.iloc[-1] - peak) / peak  # ≤ 0
        return self.base.get_reward(env) + self.penalty * float(drawdown)

组合使用

import yfinance as yf
from qtrade.env import TradingEnv
from qtrade.core import PercentageCommission

data = yf.download("GC=F", start="2023-01-01", end="2024-01-01",
                   interval="1d", multi_level_index=False)
data['returns'] = data['Close'].pct_change().fillna(0)

env = TradingEnv(
    data=data,
    cash=10_000,
    commission=PercentageCommission(0.001),
    action_scheme=TieredAction(),
    observer_scheme=WindowedObsWithPosition(
        window_size=10, features=['returns'],
    ),
    reward_scheme=DifferentialSharpe(eta=0.01),
    window_size=10,
    max_steps=200,
)

obs, info = env.reset(seed=42)
for _ in range(100):
    action = env.action_space.sample()
    obs, reward, terminated, truncated, info = env.step(action)
    if terminated or truncated:
        break

stable-baselines3 训练这个环境的方法见 Gym 交易环境