译者 | 杨晓娟
摘要
SingleStore 是一个非常通用的数据库系统。它基于关系型技术,支持多模型功能,如键值、JSON、全文搜索、地理空间和时间序列。
介绍
自关系数据库技术出现以来,许多管理数据的新需求应运而生。马丁·福勒(Martin Fowler) 等知名人士提出了混合持久化(Polyglot Persistence)作为管理各种数据和数据处理需求的一种解决方案,如图 1 所示。
然而,混合持久化是有代价的,并招致了非议,比如:
在一篇经常被引用的混合持久化帖子中,马丁·福勒为一家虚构的零售商绘制了一个 Web 应用程序,该应用程序使用 Riak、Neo4j、MongoDB、Cassandra和一个 RDBMS 来处理不同的数据集。不难想象,他的零售商的 DevOps 工程师会一个接一个地辞职。
? —斯蒂芬·皮门特尔(Stephen Pimentel)
此外:
我过去曾看到,如果你尝试采用其中的六种[技术],你至少需要 18 名员工来操作存储端——就是说,六种存储技术。那样是不可扩展的,而且成本太高。
? —大卫·麦克罗里( Dave McCrory)
近年来,也有一些使用微服务来实现混合持久化架构的建议。但是,SingleStore 可以通过在单个多模型数据库系统中支持不同的数据类型和处理需求来提供更简单的解决方案。这带来了许多好处,例如更低的 TCO(总拥有成本)、开发人员学习多种产品的负担更少、没有集成的麻烦等等。我们将在一系列文章中更详细地讨论 SingleStore 的多模型功能,现在则从时间序列数据开始。
如果你没有 Kaggle 帐户,请创建一个并下载all_stocks_5yr.csv文件。 Kaggle 站声明该文件大小为 29.58 MB。数据集由以下字段组成:
date:从2013年2月8日到2018年2月7日的五年每日期间。没有缺失值。
open:开盘价。11个缺失值。
high:最高价。8个缺失值。
low:最低价。8个缺失值。
close:收盘价。没有缺失值。
volume:成交量。没有缺失值。
name:交易代码。505个唯一值。没有缺失值。
在开始阶段,我们会用到date、close和name信息。
配置 Databricks CE
上一篇文章给出了有关如何配置 Databricks CE以及和 SingleStore 一起使用的详细说明,我们可以在这个用例中使用它们。
上传 CSV 文件
要使用CSV文件,我们需要将其上传到 Databricks CE 环境。上一篇文章提供了有关如何上传CSV文件的详细说明,我们可以在这个用例中使用它们。
创建数据库表
复制SQL: CREATE DATABASE IF NOT EXISTS timeseries_db;1.2.
再创建一个表,如下所示:
复制SQL:USE timeseries_db;CREATE ROWSTORE TABLE IF NOT EXISTS tick (
ts DATETIME SERIES TIMESTAMP,
symbol VARCHAR(5),
price NUMERIC(18, 4),
KEY(ts));1.2.3.4.5.6.7.8.
每行有一个叫作 ts的时间值属性。我们使用DATETIME而不是DATETIME(6),因为在本例中我们不使用小数秒。SERIES TIMESTAMP将表列指定为默认时间戳。在ts上创建一个KEY,因为这能让我们高效地筛选值的范围。
填写笔记本
现在新建一个 Databricks CE Python 笔记本,名为Data Loader for Time Series。把新笔记本附加到 Spark 集群上。
在一个新的代码单元中,添加以下代码:
复制Python:from pyspark.sql.types import *tick_schema = StructType([ StructField(“ts”, TimestampType(), True), StructField(“open”, DoubleType(), True), StructField(“high”, DoubleType(), True), StructField(“low”, DoubleType(), True), StructField(“price”, DoubleType(), True), StructField(“volume”, IntegerType(), True), StructField(“symbol”, StringType(), True)
])1.2.3.4.5.6.7.8.9.10.11.12.
此模式确保我们有正确的列类型。
在下一个代码单元格中新建一个 Dataframe,如下所示:
复制Python:tick_df = spark.read.csv(“/FileStore/all_stocks_5yr.csv”, header = True,
schema = tick_schema)1.2.3.4.
这会读取CSV文件并创建一个名为tick_df的Dataframe。 我们还告诉Spark有一个标题行,并要求它使用前面定义的模式。
在下一个代码单元中,我们获取行数:
复制Python:tick_df.count()1.2.
执行此操作,得到数值 619040。
根据先前的初步分析决定,我们删除掉一些列,如下所示:
复制Python:tick_df = tick_df.drop(“open”, “high”, “low”, “volume”)1.2.
并对数据进行排序:
复制Python:tick_df = tick_df.sort(“ts”, “symbol”)1.2.
在下一个代码单元中,我们查看一下 Dataframe 的结构:
复制Python:tick_df.show(10)1.2.
输出如下所示:
复制Plain Text:
+——————-+——-+——+
| ts | price|symbol|
+——————-+——-+——+
|2013-02-08 00:00:00| 45.08| A|
|2013-02-08 00:00:00| 14.75| AAL|
|2013-02-08 00:00:00| 78.9| AAP|
|2013-02-08 00:00:00|67.8542| AAPL|
|2013-02-08 00:00:00| 36.25| ABBV|
|2013-02-08 00:00:00| 46.89| ABC|
|2013-02-08 00:00:00| 34.41| ABT|
|2013-02-08 00:00:00| 73.31| ACN|
|2013-02-08 00:00:00| 39.12| ADBE|
|2013-02-08 00:00:00| 45.7| ADI|
+——————-+——-+——+
only showing top 10 rows1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.
现在准备将 Dataframe 写到 SingleStore。 在下一个代码单元中,可以添加以下内容:
复制Python:%run ./Setup1.2.
在Setup笔记本中,需要确保已为 SingleStore 托管服务集群添加了服务器地址和密码。
在下一代码单元中,我们将为 SingleStore Spark 连接器设置一些参数,如下所示:
复制Python:spark.conf.set(“spark.datasource.singlestore.ddlEndpoint”, cluster)spark.conf.set(“spark.datasource.singlestore.user”, “admin”)spark.conf.set(“spark.datasource.singlestore.password”, password)spark.conf.set(“spark.datasource.singlestore.disablePushdown”, “false”)1.2.3.4.5.
最后,准备使用 Spark Connector将 Dataframe 写入 SingleStore
复制Python:
(tick_df.write
.format(“singlestore”)
.option(“loadDataCompression”, “LZ4”)
.mode(“ignore”)
.save(“timeseries_db.tick”))1.2.3.4.5.6.
这会将 Dataframe 写入timeseries_db数据库中的tick表。可以从 SingleStore 检查该表是否已成功填充。
示例查询
现在我们已经构建了系统,可以运行一些查询了。 SingleStore 支持一系列处理时间序列数据的有用函数。我们来看一些例子。
平均值
以下查询说明了如何计算表中全部时间序列值的简单平均值:
复制SQL:SELECT symbol, AVG(price)FROM tickGROUP BY symbolORDER BY symbol;1.2.3.4.5.
输出应该是:
复制Plain Text:
+——–+—————+
| symbol | AVG(price) |
+——–+—————+
| A | 49.20202542 |
| AAL | 38.39325226 |
| AAP | 132.43346307 |
| AAPL | 109.06669849 |
| ABBV | 60.86444003 |
… …1.2.3.4.5.6.7.8.9.10.
时间分段
时间分段可以按固定时间间隔对不同时间序列的数据进行聚合和分组。SingleStore 支持几种函数:
FIRST:与最小时间戳关联的值。此文档包含其他详细信息和示例。
LAST:与最大时间戳关联的值。此文档包含其他详细信息和示例。
TIME_BUCKET:将时间标准化为最近的存储段的开始时间。此文档包含其他详细信息和示例。
例如,可以使用TIME_BUCKET查询以五天为时间间隔进行分组的平均时间序列值,如下所示:
复制SQL:SELECT symbol, TIME_BUCKET(“5d”, ts), AVG(price)FROM tickWHERE symbol = “AAPL”GROUP BY 1, 2ORDER BY 1, 2;1.2.3.4.5.6.
输出应该是:
复制Plain Text:
+——–+———————–+————–+
| symbol | TIME_BUCKET(“5d”, ts) | AVG(price) |
+——–+———————–+————–+
| AAPL | 2013-02-08 00:00:00.0 | 67.75280000 |
| AAPL | 2013-02-13 00:00:00.0 | 66.36943333 |
| AAPL | 2013-02-18 00:00:00.0 | 64.48960000 |
| AAPL | 2013-02-23 00:00:00.0 | 63.63516667 |
| AAPL | 2013-02-28 00:00:00.0 | 61.51996667 |
… … …1.2.3.4.5.6.7.8.9.10.
还可以结合这些函数来创建烛台图,显示股票随时间的最高价、最低价、开盘价和收盘价,以五天为一个窗口单位,如下所示:
复制SQL:SELECT TIME_BUCKET(“5d”) AS ts,
symbol,
MIN(price) AS low,
MAX(price) AS high,
FIRST(price) AS open,
LAST(price) AS closeFROM tickWHERE symbol = “AAPL”GROUP BY 2, 1ORDER BY 2, 1;1.2.3.4.5.6.7.8.9.10.11.
输出应该是:
复制Plain Text:
+————+——–+———-+———-+———-+———-+
| ts | symbol | low | high | open | close |
+————+——–+———-+———-+———-+———-+
| 2013-02-08 | AAPL | 66.8428 | 68.5614 | 67.8542 | 66.8428 |
| 2013-02-13 | AAPL | 65.7371 | 66.7156 | 66.7156 | 65.7371 |
| 2013-02-18 | AAPL | 63.7228 | 65.7128 | 65.7128 | 64.4014 |
| 2013-02-23 | AAPL | 63.2571 | 64.1385 | 63.2571 | 63.5099 |
| 2013-02-28 | AAPL | 60.0071 | 63.0571 | 63.0571 | 60.0071 |
… … … … … …1.2.3.4.5.6.7.8.9.10.
平滑
可以使用AVG对窗口进行聚合来平滑时间序列数据。下面是一个示例,查看价格和过去三个分时价格的移动均线:
复制SQL:SELECT symbol, ts, price, AVG(price)OVER (ORDER BY ts ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) AS smoothed_priceFROM tickWHERE symbol = “AAPL”;1.2.3.4.5.
输出应该是:
复制Plain Text:
+——–+———————–+———-+—————-+
| symbol | ts | price | smoothed_price |
+——–+———————–+———-+—————-+
| AAPL | 2013-02-08 00:00:00.0 | 67.8542 | 67.85420000 |
| AAPL | 2013-02-11 00:00:00.0 | 68.5614 | 68.20780000 |
| AAPL | 2013-02-12 00:00:00.0 | 66.8428 | 67.75280000 |
| AAPL | 2013-02-13 00:00:00.0 | 66.7156 | 67.49350000 |
| AAPL | 2013-02-14 00:00:00.0 | 66.6556 | 67.19385000 |
… … … …1.2.3.4.5.6.7.8.9.10.
截至
查找截至某个时间点的当前表行也是常见的时间序列需求。这可以用ORDER BY和LIMIT轻松实现。下面是一个例子:
复制SQL:SELECT *FROM tickWHERE ts
输出应该是:
复制Plain Text:
+———————–+——–+———-+
| ts | symbol | price |
+———————–+——–+———-+
| 2018-02-07 00:00:00.0 | AAPL | 159.5400 |
+———————–+——–+———-+1.2.3.4.5.6.
插值
时间序列数据可能存在缺值。我们可以插入缺失的点。 SingleStore文档提供了一个示例存储过程,可在处理tick数据时用于此目的。
加分:Streamlit 可视化
之前提到过烛台图,如果能以图形而不是表格的形式看到这些图表就太好了,而使用Streamlit可以轻松做到这一点。上一篇文章展示了我们可以轻松地将 Streamlit 连接到 SingleStore。
安装所需软件
我们需要安装以下软件包:
复制Plain Text:
streamlit
pandas
plotly
Pymysql1.2.3.4.5.
这些可以在GitHub上的requirements.txt文件中找到。运行文件如下:
复制Shell:
pip install -r requirements.txt1.2.
示例应用程序
以下是 streamlit_app.py 的完整代码清单:
复制Python:# streamlit_app.pyimport streamlit as stimport pandas as pdimport plotly.graph_objects as goimport pymysql# Initialize connection.def init_connection(): return pymysql.connect(**st.secrets[“singlestore”])conn = init_connection()symbol = st.sidebar.text_input(“Symbol”, value = “AAPL”, max_chars = None, key = None, type = “default”)num_days = st.sidebar.slider(“Number of days”, 2, 30, 5)# Perform query.data = pd.read_sql(“””SELECT TIME_BUCKET(%s) AS day, symbol, MIN(price) AS low, MAX(price) AS high, FIRST(price) AS open, LAST(price) AS closeFROM tickWHERE symbol = %sGROUP BY 2, 1ORDER BY 2, 1;”””, conn, params = (str(num_days) + “d”, symbol.upper()))st.subheader(symbol.upper())fig = go.Figure(data = [go.Candlestick( x = data[“day”], open = data[“open”], high = data[“high”], low = data[“low”], close = data[“close”], name = symbol,
)])fig.update_xaxes(type = “category”)fig.update_layout(height = 700)st.plotly_chart(fig, use_container_width = True)st.write(data)1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.
创建机密文件
本地 Streamlit 应用程序会从应用程序的根目录读取机密文件 .streamlit/secrets.toml。需要按如下方式创建这个文件:
复制Plain Text:
# .streamlit/secrets.toml
[singlestore]
host = “”
port = 3306
database = “timeseries_db”
user = “admin”
password = “”1.2.3.4.5.6.7.8.9.
主机和密码的应替换为在创建集群时从 SingleStore 托管服务获取的相应值。
运行代码
可按如下方式运行 Streamlit 应用程序:
复制Shell:
streamlit run streamlit_app.py1.2.
在Web 浏览器中的输出应如图 2 所示。
在 页上,可以在文本框中输入一个新的股票代码,并使用滑块来更改TIME_BUCKET的天数。随意尝试代码以满足您的需求。
总结
致谢
感谢约翰·皮克福德(John Pickford) 博士对恰当的时间序列数据集的建议和指导。
译者介绍
声明:本站部分文章及图片源自用户投稿,如本站任何资料有侵权请您尽早请联系jinwei@zod.com.cn进行处理,非常感谢!