近期我收集了许多身边的数据做统计, 将自己一直以来的股票持仓也上报了. 数据是使用InfluxDB存储的,
前端就也用Grafana来做权限控制以及页面展示, 所以我想分享一下对于InfluxDB的一些使用以及常用的一些查询语句,
此文章分享的内容不构成任何投资建议.
目前国金这个账户我持仓大约17W, 收益率10%左右. 和那些动辄几百万, 收益率翻倍的大佬们肯定没得比.
我在博客中增加了一栏用作展示Grafana图表, 可以直接查看 Stocks
数据源
我在A股玩了几年了, 去年发现有QMT这个工具, 可以使用Python与自己的证券账户交互
http://docs.thinktrader.net/pages/040ff7/#%E8%BF%85%E6%8A%95xtquant-faq
开通的方式比较简单, 国金30W的门槛, 咨询下客户经理就能开.
去年的时候我自己写了工具来做定投, 近期收集身边数据时, 正好将持仓以及收益也收集起来.
其实有不少其他数据, 但我感觉自己的持仓是最适合作图分享的. 可能读者也更感兴趣一点.
以下是写入数据的函数, 没想到写博客的时候发现自己的代码写的有点随意, 先能跑吧.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| p = Point("balance") \ .tag('account', stock_account) \ .field("total_asset", b.total_asset) \ .field("market_value", b.market_value) \ .time(dt.datetime.utcnow(), WritePrecision.S) with client.write_api(write_options=SYNCHRONOUS) as write_api: write_api.write(bucket=bucket, record=p)
for s in stocks: p = Point("position") \ .tag('account', stock_account) \ .tag('stock_code', s.code) \ .tag('stock_name', s.name) \ .tag('market', s.market) \ .field('volume', s.volume) \ .field('volume_price', s.volume_price) \ .field('market_price', s.market_price) \ .field('open_price', s.open_price) \ .field('position_ratio', round(s.market_price / b.market_value, 3)) \ .time(dt.datetime.utcnow(), WritePrecision.S)
with client.write_api(write_options=SYNCHRONOUS) as write_api: write_api.write(bucket=bucket, record=p)
|
可以点击图片放大来查看这一示例
一些InfluxDB的Query语句分享
我在制作图表时, 有机会能使用InfluxDB来做查询, 因此整理了一些我使用到的语句.
它能自建函数, 功能着实强大, 希望分享能给读者带来一些启发.
展示持仓以及总计
1 2 3 4 5 6 7 8
| from(bucket: "Stock") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r["_measurement"] == "balance") |> filter(fn: (r) => r["_field"] == "market_value") |> aggregateWindow(every: v.windowPeriod, fn: last, createEmpty: false) |> drop(columns: ["_field", "account", "stock_code", "market"]) |> set(key: "_measurement", value: "持仓金额") |> yield(name: "last")
|
这是最基础的图表
计算整体的仓位情况
1 2 3 4 5 6 7 8 9 10 11 12
| from(bucket: "Stock") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r["_measurement"] == "balance") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") |> map(fn: (r) => ({ _time: r._time, _value: r.market_value / r.total_asset, })) |> drop(columns: ["_field", "account", "stock_code", "market", "total_asset", "market_value"]) |> set(key: "_measurement", value: "仓位")
|
InfluxDB源数据:
Grafana中, 可以制作
计算自己的实际收益率
这个图表中, 其实包含了自己投入的金额, 这个数据理论上是随时间变化的, 比如我这个月投入了1.5W,
那么实际的收益率应该用更新后的持仓数据来计算.
我能想到的方案是, 添加一个函数用来增加字段, 这个字段的意义时表明当前已经投入的金额,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| getBase = (r) => { dtime = r._time data = if dtime > time(v: "2024-07-15T19:00:00Z") then 160000.0 else 145000.0 return data }
from(bucket: "Stock") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r["_measurement"] == "balance") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") |> map(fn: (r) => ({ _time: r._time, _measurement: r._measurement, base: getBase(r), _value: (r.total_asset - getBase(r)) / getBase(r),
})) |> drop(columns: ["_field", "account", "stock_code", "market", "total_asset", "market_value", "base"]) |> set(key: "_measurement", value: "收益率")
|
但是我觉得这个getBase
函数并不够优雅, 所以这里, 我给一种打表的方案:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import "array"
inputVal = [ {date: time(v: "2024-07-15T19:00:00Z"), val: 160000.0 }, {date: time(v: "2024-01-01T19:00:00Z"), val: 145000.0 }, ]
getBase = (query) => { tdata = ( inputVal |> array.filter(fn: (x) => { return query._time >= x.date }) ) return tdata[0].val }
from(bucket: "Stock") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r["_measurement"] == "balance") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") |> map(fn: (r) => ({ _time: r._time, _measurement: r._measurement, base: getBase(query: r), _value: (r.total_asset - getBase(query:r)) / getBase(query:r), })) |> drop(columns: ["_field", "account", "stock_code", "market", "total_asset", "market_value", "base"]) |> set(key: "_measurement", value: "收益率")
|
实际的收益率曲线会比较光滑
另外, 我再提供另一种思路, 可以记录每次转账的金额, 使用reduce来计算总额
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| import "array"
inputVal = [ {date: time(v: "2024-07-15T19:00:00Z"), val: 15000.0 }, {date: time(v: "2024-01-01T19:00:00Z"), val: 145000.0 }, ]
getBase = (query) => { tdata = ( array.from(rows:inputVal) |> reduce( identity: {totalInput: 0.0}, fn: (r, accumulator) => ({ totalInput: if query._time > r.date then accumulator.totalInput + r.val else accumulator.totalInput })) |> yield() |> findRecord(fn:(key) => true, idx: 0) ) return tdata.totalInput }
|
个股持仓分布
这个比较简单, 按照market_price来做饼图就好
1 2 3 4 5 6 7
| from(bucket: "Stock") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r["_measurement"] == "position") |> filter(fn: (r) => r["_field"] == "market_price") |> aggregateWindow(every: v.windowPeriod, fn: last, createEmpty: false) |> drop(columns: ["_field", "account", "stock_code", "market"]) |> yield(name: "last")
|
个股收益率情况
1 2 3 4 5 6 7 8 9 10 11 12
| from(bucket: "Stock") |> range(start: v.timeRangeStart, stop: v.timeRangeStop) |> filter(fn: (r) => r["_measurement"] == "position") |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") |> map(fn: (r) => ({ r with _time: r._time, _measurement: r._measurement, _value: (r["volume_price"] - r["open_price"]) / r["open_price"], })) |>group(columns: ["stock_name"]) |> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false)
|
总结
有人肯定会问, 为什么你要免费分享持仓数据. 我这里想说, 即使你看了我的仓位分布, 你可能大概率也不会
买我的这些股票, 下面是原因:
- 根本不在乎理论, 就像已经有了凯利公式, 还会永远满仓, 永远热泪盈眶
- 根本看不上我这10%的收益
- 已经存在的大量付费咨询的群, 很多股民更加相信群友判断或是小道消息
不过, 我还是再强调一下: 此文章仅分享技术, 不构成任何投资建议.