自定义交易环境¶
TradingEnv 由三个可插拔的部分组成:
Scheme |
决定 |
默认 |
|---|---|---|
|
agent 的动作空间,以及一个动作如何转换为订单 |
|
|
观测空间,以及 agent 每步看到什么 |
|
|
|
|
继承任一抽象基类,把你的实例传给 TradingEnv(...):
from qtrade.env import TradingEnv
env = TradingEnv(
data=df,
cash=10_000,
action_scheme=MyActions(),
observer_scheme=MyObservations(),
reward_scheme=MyReward(),
)
不传则使用对应的默认实现。可以自由混搭自定义和默认 scheme。
ActionScheme¶
需要实现两个方法:
action_space—— 一个gymnasium.spaces.Space,描述 agent 能输出什么。get_orders(action, env) -> list[Order]—— 把选定的动作转换为待下的Order列表。返回[]表示「什么都不做」。
env 暴露 env.position、env.equity、env.data、env.current_time,如需更多可用底层的 env._broker。
示例:分档仓位动作¶
默认 action scheme 用固定 size=1。下面这个让 agent 可以同时选择仓位档位(小 / 中 / 大)和方向:
import gymnasium as gym
from gymnasium.spaces import Space
from qtrade.env import ActionScheme
from qtrade.core import Order
class TieredAction(ActionScheme):
"""6-discrete: {flat, small_long, large_long, small_short, large_short, hold}."""
SIZES = {"small": 5, "large": 20}
@property
def action_space(self) -> Space:
return gym.spaces.Discrete(6)
def get_orders(self, action: int, env) -> list[Order]:
target = {
0: 0, # flat
1: +self.SIZES["small"],
2: +self.SIZES["large"],
3: -self.SIZES["small"],
4: -self.SIZES["large"],
5: env.position.size, # hold
}[action]
delta = target - env.position.size
return [Order(size=delta)] if delta != 0 else []
示例:连续(Box)动作空间¶
PPO / SAC 想让 agent 直接选目标仓位比例时很有用。
import numpy as np
from gymnasium.spaces import Box
from qtrade.env import ActionScheme
from qtrade.core import Order
class ContinuousAllocation(ActionScheme):
"""Single float in [-1, 1] — fraction of max size, sign = side."""
def __init__(self, max_size: int = 100):
self.max_size = max_size
@property
def action_space(self):
return Box(low=-1.0, high=1.0, shape=(1,), dtype=np.float32)
def get_orders(self, action, env):
target = int(round(float(action[0]) * self.max_size))
delta = target - env.position.size
return [Order(size=delta)] if delta != 0 else []
ObserverScheme¶
需要实现两个方法:
observation_space—— 一个gymnasium.spaces.Space。确保其shape/dtype和get_observation实际返回的匹配。get_observation(env) -> np.ndarray | dict[str, np.ndarray]—— 从env.data、env.position、env._broker等取你想要的数据,形状与observation_space匹配。
默认 observer 返回特征列的滚动窗口。你可以继承它返回原始 OHLCV、技术指标、持仓状态,或者用 Dict 空间组合多种模态。
示例:把持仓状态加入观测¶
常见需求:让 agent 在看市场数据的同时知道当前仓位大小和浮动盈亏:
import numpy as np
import gymnasium as gym
from qtrade.env import ObserverScheme
class WindowedObsWithPosition(ObserverScheme):
def __init__(self, window_size: int, features: list[str]):
self.window_size = window_size
self.features = features
@property
def observation_space(self):
return gym.spaces.Dict({
"market": gym.spaces.Box(
low=-np.inf, high=np.inf,
shape=(self.window_size, len(self.features)),
dtype=np.float32,
),
"position": gym.spaces.Box(
low=-np.inf, high=np.inf, shape=(2,), dtype=np.float32,
),
})
def get_observation(self, env):
market = (
env.data[self.features]
.iloc[-self.window_size:]
.values.astype(np.float32)
)
position = np.array(
[env.position.size, env._broker.unrealized_pnl],
dtype=np.float32,
)
return {"market": market, "position": position}
Dict观测空间在stable-baselines3里需要MultiInputPolicy。大多数现成策略请坚持使用扁平的Box。
示例:用收益率代替原始价格¶
原始价格序列是非平稳的。大多数 RL 论文会喂对数收益率:
import numpy as np
import gymnasium as gym
from qtrade.env import ObserverScheme
class LogReturnsObserver(ObserverScheme):
def __init__(self, window_size: int):
self.window_size = window_size
@property
def observation_space(self):
return gym.spaces.Box(
low=-np.inf, high=np.inf,
shape=(self.window_size,), dtype=np.float32,
)
def get_observation(self, env):
closes = env.data['Close'].iloc[-(self.window_size + 1):].values
returns = np.diff(np.log(closes)).astype(np.float32)
return returns
RewardScheme¶
一个必需方法、一个可选方法:
get_reward(env) -> float—— 当前步的标量奖励。reset()—— 可选。env.reset()时调用。用来清除奖励函数维护的 episode 级状态。
默认奖励是本步关闭交易的对数已实现收益(扣佣金)。还有三种模式经常出现:
示例:基于权益的奖励(每步触发)¶
from qtrade.env import RewardScheme
class EquityChangeReward(RewardScheme):
"""Reward = change in equity since the previous step."""
def __init__(self):
self._prev_equity = None
def reset(self):
self._prev_equity = None
def get_reward(self, env) -> float:
equity = env._broker.equity
if self._prev_equity is None:
self._prev_equity = equity
return 0.0
delta = equity - self._prev_equity
self._prev_equity = equity
return float(delta)
示例:差分 Sharpe 比率¶
奖励风险调整后收益而非原始盈亏 —— 见 Moody & Saffell (1998):
from qtrade.env import RewardScheme
class DifferentialSharpe(RewardScheme):
def __init__(self, eta: float = 0.01):
self.eta = eta
self._A = 0.0 # EMA of return
self._B = 0.0 # EMA of squared return
self._prev_equity = None
def reset(self):
self._A = self._B = 0.0
self._prev_equity = None
def get_reward(self, env) -> float:
equity = env._broker.equity
if self._prev_equity is None or self._prev_equity == 0:
self._prev_equity = equity
return 0.0
r = (equity - self._prev_equity) / self._prev_equity
self._prev_equity = equity
dA = r - self._A
dB = r * r - self._B
denom = (self._B - self._A ** 2) ** 1.5
d_sharpe = (self._B * dA - 0.5 * self._A * dB) / denom if denom > 1e-9 else 0.0
self._A += self.eta * dA
self._B += self.eta * dB
return float(d_sharpe)
示例:回撤惩罚(包装另一个奖励)¶
一个风险感知的奖励,包装任意基础 scheme 并加上与当前回撤成比例的惩罚:
from qtrade.env import RewardScheme
class DrawdownPenalty(RewardScheme):
def __init__(self, base: RewardScheme, penalty: float = 1.0):
self.base = base
self.penalty = penalty
def reset(self):
self.base.reset()
def get_reward(self, env) -> float:
eq = env._broker.equity_history.loc[:env.current_time]
peak = eq.cummax().iloc[-1]
drawdown = (eq.iloc[-1] - peak) / peak # ≤ 0
return self.base.get_reward(env) + self.penalty * float(drawdown)
组合使用¶
import yfinance as yf
from qtrade.env import TradingEnv
from qtrade.core import PercentageCommission
data = yf.download("GC=F", start="2023-01-01", end="2024-01-01",
interval="1d", multi_level_index=False)
data['returns'] = data['Close'].pct_change().fillna(0)
env = TradingEnv(
data=data,
cash=10_000,
commission=PercentageCommission(0.001),
action_scheme=TieredAction(),
observer_scheme=WindowedObsWithPosition(
window_size=10, features=['returns'],
),
reward_scheme=DifferentialSharpe(eta=0.01),
window_size=10,
max_steps=200,
)
obs, info = env.reset(seed=42)
for _ in range(100):
action = env.action_space.sample()
obs, reward, terminated, truncated, info = env.step(action)
if terminated or truncated:
break
用 stable-baselines3 训练这个环境的方法见 Gym 交易环境。