diff --git a/benchmark/pax_storage_concurrency_test.cpp b/benchmark/pax_storage_concurrency_test.cpp index f1211a9fb..1702755de 100644 --- a/benchmark/pax_storage_concurrency_test.cpp +++ b/benchmark/pax_storage_concurrency_test.cpp @@ -117,7 +117,7 @@ class BenchmarkBase : public Fixture table_meta_->fields_[1].attr_len_ = 11; table_meta_->fields_[1].field_id_ = 1; handler_ = new RecordFileHandler(StorageFormat::PAX_FORMAT); - rc = handler_->init(*buffer_pool_, log_handler_, table_meta_); + rc = handler_->init(*buffer_pool_, log_handler_, table_meta_, nullptr); if (rc != RC::SUCCESS) { LOG_WARN("failed to init record file handler. rc=%s", strrc(rc)); throw runtime_error("failed to init record file handler"); diff --git a/benchmark/record_manager_concurrency_test.cpp b/benchmark/record_manager_concurrency_test.cpp index 2246ee9fd..fcdd28a5f 100644 --- a/benchmark/record_manager_concurrency_test.cpp +++ b/benchmark/record_manager_concurrency_test.cpp @@ -106,7 +106,7 @@ class BenchmarkBase : public Fixture } handler_ = new RecordFileHandler(StorageFormat::ROW_FORMAT); - rc = handler_->init(*buffer_pool_, log_handler_, nullptr); + rc = handler_->init(*buffer_pool_, log_handler_, nullptr, nullptr); if (rc != RC::SUCCESS) { LOG_WARN("failed to init record file handler. rc=%s", strrc(rc)); throw runtime_error("failed to init record file handler"); diff --git a/docs/docs/design/miniob-how-to-add-new-datatype.md b/docs/docs/design/miniob-how-to-add-new-datatype.md new file mode 100644 index 000000000..4ad9bb0ff --- /dev/null +++ b/docs/docs/design/miniob-how-to-add-new-datatype.md @@ -0,0 +1,32 @@ +--- +title: 如何新增一种数据类型 +--- + +> 本文介绍如何新增一种数据类型。 +MiniOB 的数据类型系统采用分层设计,实现集中在[path](../../../src/observer/common)文件夹下,核心组件包括: +1. Value 类:统一数据操作接口 +路径:src/observer/common/value.h +作用:封装实际数据值,提供类型无关的操作方法 +2. Type 工具类:特定类型的操作实现 +路径:src/observer/common/type/ +作用:每种数据类型对应一个工具类,实现具体运算逻辑 + +以下示例展示 MiniOB 如何处理整数类型数据: +```cpp +// 假设解析器识别到整数 "1" +int val = 1; +Value value(val); // 封装为 Value 对象 +// 执行加法运算 +Value result; +Value::add(value, value, result); // 调用加法接口 +// Value::add 方法内部会根据类型调用对应工具类 +// 对于 INT 类型,实际调用代码位于: +// src/observer/common/type/integer_type.cpp +``` + +# 若要新增一种数据类型(如 DATE),建议按以下步骤开发: +1. 在 src/observer/common/type/attr_type.h 中添加新的类型枚举以及对应类型名 +2. 在 src/observer/common/type/data_type.cpp 中添加新的类型实例 +3. 在 src/observer/common/type/ 文件夹下,参照现有工具类,实现 DateType 工具类 +4. 在 Value 类中增加类型处理逻辑,支持date类型的分发,储存date类型值 +5. 必要情况下还需要增加新的词法规则(lex_sql.l)以及语法规则(yacc_sql.y),支持新类型关键字 \ No newline at end of file diff --git a/docs/docs/design/miniob-realtime-analytic.md b/docs/docs/design/miniob-realtime-analytic.md new file mode 100644 index 000000000..e10feb195 --- /dev/null +++ b/docs/docs/design/miniob-realtime-analytic.md @@ -0,0 +1,518 @@ +--- +title: 暑期学校2025(实时分析) +--- +> 请不要将代码提交到公开仓库(包括提交带有题解的 Pull Request),同时也请不要抄袭其他同学或网络上可能存在的代码。 + +# 暑期学校2025(实时分析) + +本篇文档为中国数据库暑期学校 2025 中 MiniOB 部分实训文档。MiniOB 用 C++ 编写,是一个教学用途的数据库,其目标是为在校学生、数据库从业者、爱好者提供一个友好的数据库学习项目,更好地将理论、实践进行结合,提升同学们的工程实战能力。在本次实训中,需要同学们基于 MiniOB 完成如下四个主题的实验。 + +* LAB#1 PAX 存储和数据导入 +* LAB#2 向量化执行算子实现 +* LAB#3 物化视图实现 +* LAB#4 实时分析性能优化 + +## 在实训之前你需要了解的 + +由于很多同学是第一次接触 MiniOB,可能对实训流程没有概念。所以这里提供一个简单的实训流程,以及一个入门小任务(**不需要写代码**)帮助大家熟悉开发环境和测试平台,并获得本次实训的基础分。 + +### MiniOB 实训流程 + +1. **学习 C++ 编程基础。** MiniOB 用 C++ 编写,如果你没有 C++ 编程基础,请先参考本文中 `c++编程语言学习` 的参考资料(或者其他任何资料)学习 C++ 基础知识。可以通过 Cpplings: `src/cpplings/README.md` 来做一个编程练习。 +2. **准备自己的代码仓库。** 请参考[文档](../game/github-introduction.md),在 Github/Gitee 上创建自己的**私有代码**仓库。(**请不要创建公开仓库,也不要将代码提交到公开仓库。**) +3. **准备自己的开发环境。** 我们为大家提供了一个[开源学堂在线编程环境](TODO),在准备本地开发环境存在困难的情况下可以使用开源学堂在线编程环境进行实验,在线编程环境已经提供了可以直接用于 MiniOB 编程的环境,便于大家快速开始。对于希望在本地准备开发环境的同学,[这篇文档](../dev-env/introduction.md) 中已经介绍的十分详细,请先认真阅读。如果仍有疑问,欢迎提问,也非常欢迎刚刚入门的同学分享自己准备开发环境的经验。 +4. **浏览本篇文档的实验部分。** 需要大家理解各个题目描述功能,写代码,实现题目要求的功能。[MiniOB 教程](https://open.oceanbase.com/course/427)中有一些视频讲解教程,可以辅助学习 MiniOB。 +5. **提交自己的代码到线上测试平台**(需要使用 git 命令提交代码,可参考[文档](../game/git-introduction.md)),在训练营中提交测试(参考[训练营使用说明](https://ask.oceanbase.com/t/topic/35600372) )。 +6. **继续重复上述 4-5 步骤,直到完成所有实验题目。** 提交代码后,训练营会自动运行你的代码,返回测试结果。你需要根据训练营中返回的日志信息,继续修改自己的代码/提交测试。并不断重复这一过程,直到完成所有实验题目。 + +### 入门小任务 + +这里提供给大家一个入门小任务,帮助大家熟悉开发环境和测试平台。这个小任务完全不需要编写任何 C++代码,主要用于给大家熟悉实验环境。 + +**任务描述**:创建自己的 MiniOB 代码仓库,在开发环境中验证 MiniOB 基础功能,并将代码提交到测试平台通过 `basic` 题目。 + +#### 1. 学会使用 Git/Github,并创建自己的 MiniOB 代码仓库 +在本实验课程中,我们使用 [Git](https://git-scm.com/) 进行代码版本管理(可以理解为用来追踪代码的变化),使用 [GitHub](https://github.com/)/[Gitee](https://gitee.com/) 做为代码托管平台。如果对于 Git/GitHub 的使用不熟悉,可以参考Git/Github 的官方文档。这里也提供了简易的[Git 介绍文档](../game/git-introduction.md)帮助大家了解部分 Git 命令。 + +如果你已经对 Git/Github 的使用有了一定的了解,可以参考[Github 使用文档](../game/github-introduction.md) 或 [Gitee 使用文档](../game//gitee-instructions.md)来创建自己的 MiniOB 代码仓库(**注意:请创建私有(Private)仓库**)。 + +#### 2. 准备自己的开发环境 +MiniOB 的开发环境需要使用 Linux/MacOS 操作系统,建议使用 Linux 操作系统。我们为大家准备好了一个[开源学堂在线编程环境](./cloudlab_setup.md)(**建议大家优先使用在线编程环境,避免由于自己开发环境问题导致的bug**),除此之外,我们准备了详细的[本地开发环境准备文档](../dev-env/introduction.md)。 + +#### 3. 在开发环境中构建调试 MiniOB,并验证 MiniOB 的基本功能 +在准备好自己的开发环境后,你就可以下载 MiniOB 代码,编译 MiniOB 并运行测试用例,验证 MiniOB 的基本功能。 + +* 下载 MiniOB 代码,**注意:这里请使用自己的私有仓库地址** +``` +git clone https://github.com/oceanbase/miniob.git +``` + +* 在 MiniOB 代码目录下,运行下面命令来编译 MiniOB +``` +bash build.sh debug +``` + +* 进入 build_debug 目录 +``` +cd build_debug/ +``` + +* 在 MiniOB 代码目录下,运行下面命令来启动 MiniOB +``` +./bin/observer +``` + +* 打开另一个终端,进入 build_debug 目录,运行下面命令来启动 MiniOB client +``` +./bin/obclient +``` + +* 在 obclient 中分别执行下面的 SQL 语句,并查看输出结果是否符合预期。 +``` +create table t1 (id int, name char(10)); +insert into t1 values (1, 'hello'); +select * from t1; +``` + +* 如果一切顺利,你的终端将会展示如下的结果: +``` +$./bin/obclient + +Welcome to the OceanBase database implementation course. + +Copyright (c) 2021 OceanBase and/or its affiliates. + +Learn more about OceanBase at https://github.com/oceanbase/oceanbase +Learn more about MiniOB at https://github.com/oceanbase/miniob + +miniob > create table t1 (id int, name char(10)); +SUCCESS +miniob > insert into t1 values (1, 'hello'); +SUCCESS +miniob > select * from t1; +id | name +1 | hello +miniob > +``` + +#### 4. 将代码提交到测试平台,并通过 `basic` 题目 + +测试平台中的 `basic` 题目是用来验证 MiniOB 的基本功能的(如创建表,插入数据,查询数据等),原始的 MiniOB 代码(**不需要任何代码修改**)就可以通过测试。你需要参考[文档](https://ask.oceanbase.com/t/topic/35600372)来将 MiniOB 代码提交到测试平台进行测试,并通过 `basic` 题目。至此,恭喜你已经顺利熟悉了开发环境和测试平台的使用。 + +### 注意事项 + +1. **最重要的一条**:请不要将代码提交到公开仓库(包括提交带有题解的 Pull Request),同时也请不要抄袭其他同学或网络上可能存在的代码。 +2. 实验文档中提供了较为详细的指导,请优先仔细阅读文档。文档无法做到囊括所有问题的程度,如果还有问题也请随时提问。对于文档中的**注意** 提示,请认真阅读。 + +### C++ 编程语言学习 + +实训需要大家具备一定的 C++ 编程基础,这里推荐几个 C++ 基础学习的链接: + +- [LearnCpp](https://www.learncpp.com/)(可以作为教程) +- Cpplings: `src/cpplings/README.md`(可以作为小练习) +- [cppreference](en.cppreference.com)(可以作为参考手册) + + +## LAB#1 PAX 存储和数据导入 +在本实验中,你需要实现 PAX 的存储格式,并支持将 CSV 格式的数据导入到 MiniOB 中。 + +### 存储模型(Storage Models) + +数据库的存储模型规定了它如何在磁盘和内存中组织数据。首先,我们来介绍下三种经典的存储模型。 + +#### N-ARY Storage Model (NSM) + +在 NSM 存储模型中,一行记录的所有属性连续存储在数据库页面(Page)中,这也被称为行式存储。NSM 适合 OLTP 工作负载。因为在 OLTP 负载中,查询更有可能访问整个记录(对整个记录进行增删改查)。 + +``` + Col1 Col2 Col3 Page + ┌─────────────┬─┐ ┌──────────┬──────────┐ +Row1│ a1 b1 c1│ │ │PageHeader│ a1 b1 c1 │ + ├─────────────┤ │ ├────────┬─┴──────┬───┤ +Row2│ a2 b2 c2│ │ │a2 b2 c2│a3 b3 c3│.. │ + ├─────────────┤ │ ├────────┴────────┴───┤ +Row3│ a3 b3 c3│ │ │... │ + ├─────────────┘ │ ├─────────────────────┤ +... │ .. .. .. │ │... │ +... │ .. .. .. │ └─────────────────────┘ + │ │ +RowN│ an bn cn │ + └───────────────┘ +``` + +#### Decomposition Storage Model (DSM) + +在 DSM 存储模型中,所有记录的单个属性被连续存储在数据块/文件中。这也被称为列式存储。DSM 适合 OLAP 工作负载,因为 OLAP 负载中往往会对表属性的一个子集执行扫描和计算。 + +``` + File/Block + Col1 Col2 Col3 ┌─────────────────────┐ + ┌────┬────┬────┬┐ │ Header │ +Row1│ a1 │ b1 │ c1 ││ ├─────────────────────┤ + │ │ │ ││ │a1 a2 a3 ......... an│ +Row2│ a2 │ b2 │ c2 ││ └─────────────────────┘ + │ │ │ ││ +Row3│ a3 │ b3 │ c3 ││ ┌─────────────────────┐ + │ │ │ ││ │ Header │ + │ . │....│. ││ ├─────────────────────┤ +... │ │ │ ││ │b1 b2 b3 ......... bn│ +... │ .. │ .. │ .. ││ └─────────────────────┘ +RowN│ an │ bn │ cn ││ + └────┴────┴────┴┘ ┌─────────────────────┐ + │ Header │ + ├─────────────────────┤ + │c1 c2 c3 ......... cn│ + └─────────────────────┘ +``` + +#### Partition Attributes Across (PAX) + +PAX (Partition Attributes Across) 是一种混合存储格式,它在数据库页面(Page)内对属性进行垂直分区。 + +``` + Col1 Col2 Col3 Page + ┌─────────────┬─┐ ┌──────────┬──────────┐ +Row1│ a1 b1 c1│ │ │PageHeader│ a1 a2 a3 │ + │ │ │ ├──────────┼──────────┤ +Row2│ a2 b2 c2│ │ │b1 b2 b3 │ c1 c2 c3 │ + │ │ │ └──────────┴──────────┘ +Row3│ a3 b3 c3│ │ .... + ├─────────────┘ │ ┌──────────┬──────────┐ + │ ......... │ │PageHeader│ ..... an │ +... ├─────────────┐ │ ├──────────┼──────────┤ +... │ .. .. ..│ │ │...... bn │ ..... cn │ +RowN│ an bn cn│ │ └──────────┴──────────┘ + └─────────────┴─┘ +``` + +### MiniOB 中 PAX 存储格式 + +### PAX 存储格式实现 + +在 MiniOB 中,RecordManager 负责一个文件中表记录(Record)的组织/管理。在没有实现 PAX 存储格式之前,MiniOB 只支持行存格式,每个记录连续存储在页面(Page)中,通过`RowRecordPageHandler` 对单个页面中的记录进行管理。需要通过实现 `PaxRecordPageHandler` 来支持页面内 PAX 存储格式的管理。 +Page 内的 PAX 存储格式如下: +``` +| PageHeader | record allocate bitmap | column index | +|------------|------------------------| ------------- | +| column1 | column2 | ..................... | columnN | +``` +其中 `PageHeader` 与 `bitmap` 和行式存储中的作用一致,`column index` 用于定位列数据在页面内的偏移量,每列数据连续存储。 + +`column index` 结构如下,为一个连续的数组。假设某个页面共有 `n + 1` 列,分别为`col_0, col_1, ..., col_n`,`col_i` 表示列 ID(column id)为 `i + 1`的列在页面内的起始地址(`i < n`)。当 `i = n`时,`col_n` 表示列 ID 为 `n` 的列在页面内的结束地址 + 1。 +``` +| col_0 | col_1 | col_2 | ...... | col_n | +|-------|-------|-------|---------|-------| +``` + + +MiniOB 支持了创建 PAX 表的语法。当不指定存储格式时,默认创建行存格式的表。 +``` +CREATE TABLE table_name + (table_definition_list) [storage_format_option] + +storage_format_option: + storage format=row + | storage format=pax +``` +示例: + +创建行存格式的表: + +```sql +create table t(a int,b int) storage format=row; +create table t(a int,b int); +``` + +创建列存格式的表: +```sql +create table t(a int,b int) storage format=pax; +``` + +#### 实验 + +实现 PAX 存储格式,需要完成 `src/observer/storage/record/record_manager.cpp` 中 `PaxRecordPageHandler::insert_record`, `PaxRecordPageHandler::get_chunk`, `PaxRecordPageHandler::get_record` 三个函数(标注 `// your code here` 的位置),详情可参考这三个函数的注释。行存格式存储是已经在MiniOB 中完整实现的,实现 PAX 存储格式的过程中可以参考 `RowRecordPageHandler`。 + +**提示**:对应的训练营测试为 lab1-pax-storage + +#### 测试 + +通过 `unittest/pax_storage_test.cpp` 中所有测试用例。 + +注意:如果需要运行 `pax_storage_test`,请移除`DISABLED_` 前缀。 + +### 数据导入实现 +需要实现 LOAD DATA 语法,对 CSV 格式的文本文件进行导入。 + +#### 语法 +``` +-- 导入普通文件 +LOAD DATA INFILE 'file_name' INTO TABLE table_name + [ FIELDS + [TERMINATED BY 'char'] + [ENCLOSED BY 'char'] + ] +``` +参数解释: + +- `file_name`:文件路径。格式为 "/\$PATH/\$FILENAME" +- `table_name`:导入数据的表的名称。 +- `ENCLOSED BY`:设置导出值的修饰符。 +- `TERMINATED BY`:设置导出列的结束符。 + +示例用法如下: +```sql +load data infile "/data/clickdata/tmp.csv" into table hits fields terminated by "," enclosed by '"'; +``` + +**提示**:可以自行实现 CSV 格式解析或使用外部库的方式实现。 + +**提示**: 对应的训练营题目为 lab1-basic-load-data + +**提示**: 相关实现位于 `src/observer/sql/executor/load_data_executor.h`,只需要考虑导入 PAX 格式的堆表表引擎(默认表引擎)即可。请修改并完善 `src/observer/sql/executor/load_data_executor.cpp` 中的 `load_data` 函数。 + +一个 csv 格式的示例文件如下,其 `ENCLOSED BY` 为 `"`,`TERMINATED BY` 为 `,`: +``` +1,2,"abc","a,ef","435" +3,4,"xxx","a +56","456" +5,6,"yyy","a +78","456" +7,8,"zzz","a90","456" +``` + +#### 支持 ClickBench 数据集导入 +本次实训的最后一个实验需要支持 ClickBench 测试。ClickBench 是 Clickhouse 提供的一个用于分析型数据库的基准测试。在本 Lab 中,需要首先支持 ClickBench 数据集的导入。 + +主要的工作是实现三个额外的数据类型,支持 DATE,TEXT,BIGINT 三种数据类型。 + +* DATE: 用于存储日期值,通常表示年、月、日的组合。它不包含时间信息(如小时、分钟、秒)。 +* BIGINT: 64位整数类型,8字节存储。 +* TEXT: 不定长字符串,text字段的最大长度为65535个字节 + +关于数据类型的更多信息可参考 MySQL 官方文档。 + +**提示**: 对应的训练营题目为 lab1-clickbench-load-data + +**提示**: 实现 TEXT 类型时可使用 `src/observer/storage/record/lob_handler.h` 进行数据存储和读取。TEXT 类型的内存形式可考虑使用 `src/observer/common/type/string_t.h`. TEXT 类型在 `Column` 中可以使用 `VectorBuffer` 来存储实际字符串。 + +**提示**: 需要在 `src/observer/common/value.h` 中支持新的数据类型,在 `src/observer/common/type/` 目录下完成新类型的实现。可以参照已有数据类型的实现方式。更多介绍可以参考[文档](miniob-how-to-add-new-datatype.md) + +#### 测试 +请参考 ClickBench 测试的数据导入部分: +https://github.com/oceanbase/ClickBench/tree/miniob/miniob + +```shell +# Start MiniOB +nohup $OBSERVER_BIN/observer -P mysql -s $SOCKET_FILE > /dev/null 2>&1 & + +sleep 3 +# Load the data + +mysql -S $SOCKET_FILE < create.sql + +echo "load data infile \"$DATA_FILE\" into table hits fields terminated by \",\" enclosed by '\"';" | mysql -S $SOCKET_FILE +``` + +## LAB#2 向量化执行算子实现 +LAB#2 需要在 MiniOB 中实现向量化执行中的简单聚合(ungrouped aggregation)和分组聚合(grouped aggregation),ORDER BY(排序),LIMIT 等算子。 + +### 执行模型(processing model) + +首先,介绍下执行模型的概念,数据库执行模型定义了在数据库中如何执行一个查询计划。这里简单介绍下两种执行模型:火山模型(Volcano iteration model) 和向量化模型(Vectorization model)。 + +``` + Volcano iteration model Vectorization model + + ┌───────────┐ ┌───────────┐ + │ │ │ │ + │ operator1 │ │ operator1 │ + │ │ │ │ + └─┬─────▲───┘ └─┬─────▲───┘ + │ │ │ │ +next()│ │ tuple next_chunk()│ │ chunk/batch + ┌─▼─────┴───┐ ┌─▼─────┴───┐ + │ │ │ │ + │ operator2 │ │ operator2 │ + │ │ │ │ + └─┬─────▲───┘ └─┬─────▲───┘ + │ │ │ │ +next()│ │ tuple next_chunk()│ │ chunk/batch + ┌─▼─────┴───┐ ┌─▼─────┴───┐ + │ │ │ │ + │ operator3 │ │ operator3 │ + │ │ │ │ + └───────────┘ └───────────┘ +``` + +#### 火山模型 + +在火山模型中,每个查询计划操作符(operator)都实现了一个 `next()` 函数。在每次调用时,操作符要么返回一个元组(tuple),要么在没有更多元组时返回一个空结果。每个操作符在它的子运算符调用 `next()` 来获取元组,然后对它们进行处理,并向上返回结果。 + +#### 向量化执行模型 + +向量化执行模型与火山模型类似,其中每个运算符都有一个生成元组的 `next_chunk()` 方法。然而不同的是,`next_chunk()`方法的每次调用都会获取一组元组而不是仅仅一个元组,这会分摊迭代调用开销。 + +#### MiniOB 中的实现 + +在 MiniOB 中已经实现了上述的两种执行模型,可以通过配置项 `execution_mode` 用于区分这两种执行模型。默认使用火山模型,按 `tuple` 迭代。可通过下述 SQL 语句设置执行模型。 + +将执行模型设置为按 `chunk` 迭代的向量化模型。 + +```sql +SET execution_mode = 'chunk_iterator'; +``` + +将执行模型设置为按 `tuple` 迭代的火山模型。 + +```sql +SET execution_mode = 'tuple_iterator'; +``` + +### 向量化执行模型中算子实现 + +**提示**: 本LAB 中的所有实验均使用 `execution_mode` 为 `chunk_iterator`,存储格式为 `storage format=pax` + +#### group by 实现 + +group by 实现主要位于`src/sql/operator/group_by_vec_physical_operator.cpp`中,这里需要实现分组聚合,即类似 `select a, sum(b) from t group by a`的SQL语句。 +group by 实现采用了基于哈希的分组方式,主要涉及到以下几个步骤: +1. 在 `open()` 函数中,通过调用下层算子的`next(Chunk &chunk)` 来不断获得下层算子的输出结果(Chunk)。对从下层算子获得的 Chunk 进行表达式计算,根据分组列计算出分组位置,并将聚合结果暂存在相应的分组位置中。 +2. 在 `next(Chunk &chunk)` 函数中,将暂存在哈希表中的聚合结果按 Chunk 格式向上返回。 + +#### 实验 + +1. 需要补充 `src/observer/sql/parser/yacc_sql.y` 中 标注`// your code here` 位置的 `aggregation` 和 `group by` 相关的语法。并通过 `src/sql/parser/gen_parser.sh` 生成正确的语法分析代码。 +2. 需要实现 `src/observer/sql/operator/aggregate_vec_physical_operator.cpp` 中标注 `// your code here` 位置的代码。 +3. 需要完整实现位于 `src/observer/sql/operator/group_by_vec_physical_operator.h` 的group by 算子。 +4. 需要实现 `src/observer/sql/expr/aggregate_hash_table.cpp::StandardAggregateHashTable` 中标注 `// your code here` 位置的代码。 + +#### 测试 + +需通过 `test/case/test/vectorized-aggregation-and-group-by.test`。 + +注意1:训练营中的测试采用对MiniOB/MySQL 输入相同的测试 SQL(MiniOB 中建表语句包含 storage format 选项,MySQL 中不包含),对比 MiniOB 执行结果与 MySQL 执行结果的方式进行。训练营中的测试 case 相比 `test/case/test/vectorized-aggregation-and-group-by.test` 更加多样(包含一些随机生成的 SQL),但不存在更加复杂的 SQL。 + +### ORDER BY / LIMIT 实现 +ORDER BY 排序是数据库的一个基本功能,就是将查询的结果按照指定的字段和顺序进行排序。 + +LIMIT 是一种用于限制查询结果返回行数的关键字。它广泛应用于 SQL 查询中,尤其是在控制输出规模或仅获取部分数据时。 + +ORDER BY 示例: +```sql +select * from t,t1 where t.id=t1.id order by t.id asc,t1.score desc; +示例中就是将结果按照t.id升序、t1.score降序的方式排序。 +``` +其中`asc`表示升序排序,`desc`表示降序。如果不指定排序顺序,就是升序,即asc。 + +LIMIT 示例: +```sql +select * from t,t1 where t.id=t1.id order by t.id asc limit 10; +``` + +**提示**:ORDER BY 与 LIMIT 算子需要参考 GROUP BY 算子的实现,也可参考 `docs/docs/design/miniob-how-to-add-new-sql.md` 中关于如何添加新的 SQL 语法的文档,实现 `ORDER BY` 子句和 `LIMIT` 子句。 + +#### 测试 + +需通过 `test/case/test/vectorized-order-by-and-limit.test`。 + + +## LAB#3 物化视图实现 + +### 背景 +物化视图(MATERIALIZED VIEW)与传统的视图不同,因为它实际上存储了查询结果的数据。简而言之,当您创建一个物化视图时,数据库执行与之关联的 SQL 查询,并将结果集存储在磁盘上。它通过预计算和存储视图的查询结果,减少实时计算,从而提升查询性能并简化复杂查询逻辑,常用于快速报表生成和数据分析场景。 + +物化视图的数据刷新包含全量刷新和增量刷新。 + +全量刷新类似于创建物化视图时,每次先清空物化视图中的数据,然后再把查询得到的结果插入到物化视图中。实现中需要等价于truncate table + insert into select。 + +增量刷新会根据更新周期内更新的数据计算出物化视图增量部分来更新到物化视图中,增量更新的代价比全量更新的代价小很多。 + +关于OceanBase 中物化视图的实现,可以参考官方文档:https://www.oceanbase.com/docs/common-oceanbase-database-cn-1000000002017308 + +### MiniOB 中的实现 +你需要完整实现创建物化视图/对物化视图进行查询的功能。 + +#### 语法 +创建物化视图: + +创建物化视图的 SQL 语句格式如下: + +``` +CREATE MATERIALIZED VIEW view_name + AS select_stmt; +``` + +view_name:指定待创建的物化视图的名称。 +AS view_select_stmt:用于定义物化视图数据的查询(SELECT)语句。该语句用于从基表(普通表或物化视图)中检索数据,并将结果存储到物化视图中。 + +创建物化视图示例: + +```sql +create materialized view mv_v1 as select count(id), sum(age) from t; +``` + +查询物化视图: +与查询普通表语法一致。 + +查询物化视图示例: +```sql +select * from mv_v1; +``` + +#### 测试 +本次实训实验中只需要考虑创建物化视图,查询物化视图的功能,不需要实现物化视图的刷新。 + +## LAB#4 实时分析性能优化 + +### 背景介绍 + +ClickBench 是Clickhouse 提供的分析场景的基准测试,主要测试数据库的实时分析能力。 ClickBench 测试中只包含1张大宽表,100多列,99906797行, 总数据量70GB。 查询覆盖43条SQL。ClickBench 的查询SQL都是单表操作,相对比较简单。 + +我们使用 ClickBench 作为本次实训的实时分析性能优化题目,考虑到测试的数据量和复杂性,我们对 ClickBench 的数据量和查询 SQL 做一些裁剪之后作为本次实训的测试。 + +### 实时分析性能优化实现 +通过本测试需实现的功能列表: +* 支持数据类型 DATE,TEXT,BIGINT的计算。(例如聚合算子/排序等) +* 支持 LAB#2 中的算子。仅支持 `execution_mode` 为 `chunk_iterator` 即可。 + * 聚合算子 + * GROUP BY 算子 + * ORDER BY 算子 + * LIMIT 算子 + +本 lab 以通过 ClickBench 测试为目标,需要实现的具体功能请结合 ClickBench 测试脚本分析。https://github.com/oceanbase/ClickBench/tree/miniob/miniob + + +**提示**: 线上评测中也会包含部分正确性测试。 + +**提示**: 对应的线上训练营为 vldbss-2025: lab4-correctness(正确性测试),vldbss-2025-perf(性能测试),性能测试训练营中返回的分数为测试中包含的所有 SQL 的总执行时间。 + +**提示**: 不允许通过恶意手段对比赛平台、评估系统和环境进行破坏或取得高分。 + +#### 通过脚本运行 ClickBench 测试 +* 下载 ClickBench 代码。 + +``` +git clone https://github.com/oceanbase/ClickBench.git -b miniob +``` +* 下载 ClickBench 数据集并解压。由于 ClickBench 数据集较大,我们只取部分数据集(前 100w 行)进行测试。 + +``` +wget --no-verbose --continue 'https://datasets.clickhouse.com/hits_compatible/hits.tsv.gz' +gzip -d -f hits.tsv.gz +head -n 1000000 hits.tsv > tmp.csv +``` +* 运行 ClickBench。 + +``` +cd miniob/ +./benchmark.sh ~/miniob/build_release/bin/ /tmp/miniob.sock /data/clickdata/tmp.csv +``` + +#### 优化提示 + +1. 当前的存储格式为 PAX 格式,每个页面内存储了所有列的数据。因此如果访问页面的部分列,实际上也会产生整个页面的 IO 访问。因此可以尝试对数据存储格式进行优化,避免不必要的 IO 访问。 +2. 对于 ORDER BY + LIMIT 的查询,本质上是一个 TOP-N 计算。因此如果 ORDER BY 语句块后面还有 LIMIT 语句,可以在优化器中进一步优化执行计划,生成 TOP-N SORT 算子,即采用堆排序来计算 TOP-N 的数据,减少冗余的排序计算。 +3. 使用 SIMD 指令优化部分计算。通过 SIMD 指令,我们可以优化 MiniOB 向量化执行引擎中的部分批量运算操作,如表达式计算,聚合计算等。关于 SIMD 指令的更多介绍可以参考[文档](./miniob-aggregation-and-group-by.md#simd-介绍) +4. 在存储中维护统计信息,记录页面内元数据(最大值、最小值、COUNT、SUM等)。对于某些查询可直接通过元数据得到页面内的计算结果。 +5. 受限于线上评测机的规格,不建议使用多线程来优化性能。 +6. ...... diff --git a/docs/docs/how_to_build.md b/docs/docs/how_to_build.md index 166892a38..c6823f522 100644 --- a/docs/docs/how_to_build.md +++ b/docs/docs/how_to_build.md @@ -160,3 +160,32 @@ git config --global core.autocrlf false 关于该问题的更多细节,请参考[问题来源](https://ask.oceanbase.com/t/topic/35604437/7)。 关于该问题的进一步分析,请参考[Linux系统下执行sudo命令环境变量失效现象](https://zhuanlan.zhihu.com/p/669332689)。 也可以将cmake所在路径添加到sudo的PATH变量中来解决上述问题,请参考[sudo命令下环境变量实效的解决方法](https://www.cnblogs.com/xiao-xiaoyang/p/17444600.html)。 + + +### 3. Could not find a package configuration file provided by "Libevent" +在执行build.sh脚本时,遇到下面的错误 +![cmake error](images/miniob-build-libevent.png) + +通常是因为cmake版本原因(版本太高?)导致libevent在init阶段没有编译成功。 + +***解决方法:*** + +在[text](../../deps/3rd/libevent/CMakeLists.txt) 中将cmake的最低版本设置 +cmake_minimum_required(VERSION 3.1 FATAL_ERROR) +改为 +cmake_minimum_required(VERSION 3.1...3.8 FATAL_ERROR) +之后重新执行 +```bash +sudo bash build.sh init +``` + +如果你成功解决libevent的问题,你大概率会遇到另一个错误: +![cmake error](images/miniob-build-jsoncpp.png) +需要在[text](../../deps/3rd/jsoncpp/jsoncppConfig.cmake.in)中将cmake策略 +cmake_policy(VERSION 3.0) +改为 +cmake_policy(VERSION 3.0...3.8) +之后重新执行 +```bash +sudo bash build.sh init +``` \ No newline at end of file diff --git a/docs/docs/images/miniob-build-jsoncpp.png b/docs/docs/images/miniob-build-jsoncpp.png new file mode 100644 index 000000000..663d9ecdb Binary files /dev/null and b/docs/docs/images/miniob-build-jsoncpp.png differ diff --git a/docs/docs/images/miniob-build-libevent.png b/docs/docs/images/miniob-build-libevent.png new file mode 100644 index 000000000..edc127d7b Binary files /dev/null and b/docs/docs/images/miniob-build-libevent.png differ diff --git a/src/common/lang/comparator.cpp b/src/common/lang/comparator.cpp index 1f76a2af8..407ae51b5 100644 --- a/src/common/lang/comparator.cpp +++ b/src/common/lang/comparator.cpp @@ -32,6 +32,19 @@ int compare_int(void *arg1, void *arg2) } } +int compare_int64(void *arg1, void *arg2) +{ + int v1 = *(int64_t *)arg1; + int v2 = *(int64_t *)arg2; + if (v1 > v2) { + return 1; + } else if (v1 < v2) { + return -1; + } else { + return 0; + } +} + int compare_float(void *arg1, void *arg2) { float v1 = *(float *)arg1; diff --git a/src/common/lang/comparator.h b/src/common/lang/comparator.h index fa5e10f8b..3a6f5cf34 100644 --- a/src/common/lang/comparator.h +++ b/src/common/lang/comparator.h @@ -17,6 +17,7 @@ See the Mulan PSL v2 for more details. */ namespace common { int compare_int(void *arg1, void *arg2); +int compare_int64(void *arg1, void *arg2); int compare_float(void *arg1, void *arg2); int compare_string(void *arg1, int arg1_max_length, void *arg2, int arg2_max_length); diff --git a/src/observer/common/type/attr_type.cpp b/src/observer/common/type/attr_type.cpp index 131342db8..e8af0084a 100644 --- a/src/observer/common/type/attr_type.cpp +++ b/src/observer/common/type/attr_type.cpp @@ -31,3 +31,13 @@ AttrType attr_type_from_string(const char *s) } return AttrType::UNDEFINED; } + +bool is_numerical_type(AttrType type) +{ + return (type == AttrType::INTS || type == AttrType::FLOATS); +} + +bool is_string_type(AttrType type) +{ + return (type == AttrType::CHARS); +} \ No newline at end of file diff --git a/src/observer/common/type/attr_type.h b/src/observer/common/type/attr_type.h index d1e56cf0d..623787ed5 100644 --- a/src/observer/common/type/attr_type.h +++ b/src/observer/common/type/attr_type.h @@ -27,3 +27,5 @@ enum class AttrType const char *attr_type_to_string(AttrType type); AttrType attr_type_from_string(const char *s); +bool is_numerical_type(AttrType type); +bool is_string_type(AttrType type); diff --git a/src/observer/common/type/data_type.cpp b/src/observer/common/type/data_type.cpp index 5dc3d410e..6df3fa6ac 100644 --- a/src/observer/common/type/data_type.cpp +++ b/src/observer/common/type/data_type.cpp @@ -14,6 +14,9 @@ See the Mulan PSL v2 for more details. */ #include "common/type/data_type.h" #include "common/type/vector_type.h" +// Todo: 实现新数据类型 +// your code here + array, static_cast(AttrType::MAXTYPE)> DataType::type_instances_ = { make_unique(AttrType::UNDEFINED), make_unique(), diff --git a/src/observer/common/type/data_type.h b/src/observer/common/type/data_type.h index a44a04a08..54f0f3dc0 100644 --- a/src/observer/common/type/data_type.h +++ b/src/observer/common/type/data_type.h @@ -17,6 +17,7 @@ See the Mulan PSL v2 for more details. */ #include "common/type/attr_type.h" class Value; +class Column; /** * @brief 定义了数据类型相关的操作,比如比较运算、算术运算等 @@ -24,6 +25,7 @@ class Value; * @details 数据类型定义的算术运算中,比如 add、subtract 等,将按照当前数据类型设置最终结果值的类型。 * 参与运算的参数类型不一定相同,不同的类型进行运算是否能够支持需要参考各个类型的实现。 */ + class DataType { public: @@ -47,6 +49,8 @@ class DataType */ virtual int compare(const Value &left, const Value &right) const { return INT32_MAX; } + virtual int compare(const Column &left, const Column &right, int left_idx, int right_idx) const { return INT32_MAX; } + /** * @brief 计算 left + right,并将结果保存到 result 中 */ diff --git a/src/observer/common/type/float_type.cpp b/src/observer/common/type/float_type.cpp index fcf89a7cc..207f705a7 100644 --- a/src/observer/common/type/float_type.cpp +++ b/src/observer/common/type/float_type.cpp @@ -15,16 +15,25 @@ See the Mulan PSL v2 for more details. */ #include "common/value.h" #include "common/lang/limits.h" #include "common/value.h" +#include "storage/common/column.h" int FloatType::compare(const Value &left, const Value &right) const { - ASSERT(left.attr_type() == AttrType::FLOATS, "left type is not integer"); + ASSERT(left.attr_type() == AttrType::FLOATS, "left type is not float"); ASSERT(right.attr_type() == AttrType::INTS || right.attr_type() == AttrType::FLOATS, "right type is not numeric"); float left_val = left.get_float(); float right_val = right.get_float(); return common::compare_float((void *)&left_val, (void *)&right_val); } +int FloatType::compare(const Column &left, const Column &right, int left_idx, int right_idx) const +{ + ASSERT(left.attr_type() == AttrType::FLOATS, "left type is not float"); + ASSERT(right.attr_type() == AttrType::FLOATS, "right type is not float"); + return common::compare_float((void *)&((float*)left.data())[left_idx], + (void *)&((float*)right.data())[right_idx]); +} + RC FloatType::add(const Value &left, const Value &right, Value &result) const { result.set_float(left.get_float() + right.get_float()); diff --git a/src/observer/common/type/float_type.h b/src/observer/common/type/float_type.h index fd4a7fcb5..9bf0be848 100644 --- a/src/observer/common/type/float_type.h +++ b/src/observer/common/type/float_type.h @@ -23,6 +23,7 @@ class FloatType : public DataType virtual ~FloatType() = default; int compare(const Value &left, const Value &right) const override; + int compare(const Column &left, const Column &right, int left_idx, int right_idx) const override; RC add(const Value &left, const Value &right, Value &result) const override; RC subtract(const Value &left, const Value &right, Value &result) const override; diff --git a/src/observer/common/type/integer_type.cpp b/src/observer/common/type/integer_type.cpp index 5fc2eb817..48f20f7df 100644 --- a/src/observer/common/type/integer_type.cpp +++ b/src/observer/common/type/integer_type.cpp @@ -13,6 +13,7 @@ See the Mulan PSL v2 for more details. */ #include "common/log/log.h" #include "common/type/integer_type.h" #include "common/value.h" +#include "storage/common/column.h" int IntegerType::compare(const Value &left, const Value &right) const { @@ -28,6 +29,14 @@ int IntegerType::compare(const Value &left, const Value &right) const return INT32_MAX; } +int IntegerType::compare(const Column &left, const Column &right, int left_idx, int right_idx) const +{ + ASSERT(left.attr_type() == AttrType::INTS, "left type is not integer"); + ASSERT(right.attr_type() == AttrType::INTS, "right type is not integer"); + return common::compare_int((void *)&((int*)left.data())[left_idx], + (void *)&((int*)right.data())[right_idx]); +} + RC IntegerType::cast_to(const Value &val, AttrType type, Value &result) const { switch (type) { diff --git a/src/observer/common/type/integer_type.h b/src/observer/common/type/integer_type.h index 546ed32de..9b6472e34 100644 --- a/src/observer/common/type/integer_type.h +++ b/src/observer/common/type/integer_type.h @@ -23,6 +23,7 @@ class IntegerType : public DataType virtual ~IntegerType() {} int compare(const Value &left, const Value &right) const override; + int compare(const Column &left, const Column &right, int left_idx, int right_idx) const override; RC add(const Value &left, const Value &right, Value &result) const override; RC subtract(const Value &left, const Value &right, Value &result) const override; @@ -31,6 +32,16 @@ class IntegerType : public DataType RC cast_to(const Value &val, AttrType type, Value &result) const override; + int cast_cost(const AttrType type) override + { + if (type == AttrType::INTS) { + return 0; + } else if (type == AttrType::FLOATS) { + return 1; + } + return INT32_MAX; + } + RC set_value_from_str(Value &val, const string &data) const override; RC to_string(const Value &val, string &result) const override; diff --git a/src/observer/common/type/string_t.h b/src/observer/common/type/string_t.h new file mode 100644 index 000000000..45ed229b5 --- /dev/null +++ b/src/observer/common/type/string_t.h @@ -0,0 +1,124 @@ +/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. +miniob is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. */ + +#pragma once + +#include +#include + +using namespace std; +struct string_t { +public: + static constexpr int INLINE_LENGTH = 12; + + string_t() = default; + + explicit string_t(uint32_t len) { + value.inlined.length = len; + } + + string_t(const char *data, uint32_t len) { + init(data, len); + } + + ~string_t() + { + reset(); + } + + void init(const char *data, uint32_t len) { + value.inlined.length = len; + if (is_inlined()) { + memset(value.inlined.inlined, 0, INLINE_LENGTH); + if (size() == 0) { + return; + } + memcpy(value.inlined.inlined, data, size()); + } else { + value.pointer.ptr = (char *)data; + } + } + + void reset() { + if (is_inlined()) { + memset(value.inlined.inlined, 0, INLINE_LENGTH); + } else { + value.pointer.ptr = nullptr; + } + value.inlined.length = 0; + } + + string_t(const char *data) + : string_t(data, strlen(data)) { + } + string_t(const string &value) + : string_t(value.c_str(), value.size()) { + } + + bool is_inlined() const { + return size() <= INLINE_LENGTH; + } + + const char *data() const { + return is_inlined() ? value.inlined.inlined : value.pointer.ptr; + } + + char *get_data_writeable() const { + return is_inlined() ? (char *)value.inlined.inlined : value.pointer.ptr; + } + + int size() const { + return value.inlined.length; + } + + bool empty() const { + return value.inlined.length == 0; + } + + string get_string() const { + return string(data(), size()); + } + + bool operator==(const string_t &r) const { + if (this->size() != r.size()) { + return false; + } + return (memcmp(this->data(), r.data(), this->size()) == 0); + } + + bool operator!=(const string_t &r) const { + return !(*this == r); + } + + bool operator>(const string_t &r) const { + const uint32_t left_length = this->size(); + const uint32_t right_length = r.size(); + const uint32_t min_length = std::min(left_length, right_length); + + auto memcmp_res = memcmp(this->data(), r.data(), min_length); + return memcmp_res > 0 || (memcmp_res == 0 && left_length > right_length); + + } + bool operator<(const string_t &r) const { + return r > *this; + } + + struct Inlined { + uint32_t length; + char inlined[12]; + }; + union { + struct { + uint32_t length; + char *ptr; + } pointer; + Inlined inlined; + } value; +}; \ No newline at end of file diff --git a/src/observer/common/value.cpp b/src/observer/common/value.cpp index 6d83a58fa..424c67350 100644 --- a/src/observer/common/value.cpp +++ b/src/observer/common/value.cpp @@ -28,6 +28,9 @@ Value::Value(bool val) { set_boolean(val); } Value::Value(const char *s, int len /*= 0*/) { set_string(s, len); } +Value::Value(const string_t& s) { set_string(s.data(), s.size()); } + + Value::Value(const Value &other) { this->attr_type_ = other.attr_type_; @@ -175,6 +178,19 @@ void Value::set_string(const char *s, int len /*= 0*/) } } +void Value::set_empty_string(int len) +{ + reset(); + attr_type_ = AttrType::CHARS; + + own_data_ = true; + value_.pointer_value_ = new char[len + 1]; + length_ = len; + memset(value_.pointer_value_, 0, len); + value_.pointer_value_[len] = '\0'; + +} + void Value::set_value(const Value &value) { switch (value.attr_type_) { @@ -206,14 +222,14 @@ void Value::set_string_from_other(const Value &other) } } -const char *Value::data() const +char *Value::data() const { switch (attr_type_) { case AttrType::CHARS: { return value_.pointer_value_; } break; default: { - return (const char *)&value_; + return (char *)&value_; } break; } } @@ -289,6 +305,12 @@ float Value::get_float() const string Value::get_string() const { return this->to_string(); } +string_t Value::get_string_t() const +{ + ASSERT(attr_type_ == AttrType::CHARS, "attr type is not CHARS"); + return string_t(value_.pointer_value_, length_); +} + bool Value::get_boolean() const { switch (attr_type_) { diff --git a/src/observer/common/value.h b/src/observer/common/value.h index 49d514fc5..61b9a734c 100644 --- a/src/observer/common/value.h +++ b/src/observer/common/value.h @@ -18,6 +18,7 @@ See the Mulan PSL v2 for more details. */ #include "common/lang/memory.h" #include "common/type/attr_type.h" #include "common/type/data_type.h" +#include "common/type/string_t.h" /** * @brief 属性的值 @@ -46,6 +47,7 @@ class Value final explicit Value(float val); explicit Value(bool val); explicit Value(const char *s, int len = 0); + explicit Value(const string_t& val); Value(const Value &other); Value(Value &&other); @@ -95,7 +97,7 @@ class Value final int compare(const Value &other) const; - const char *data() const; + char *data() const; int length() const { return length_; } AttrType attr_type() const { return attr_type_; } @@ -105,15 +107,17 @@ class Value final * 获取对应的值 * 如果当前的类型与期望获取的类型不符,就会执行转换操作 */ - int get_int() const; - float get_float() const; - string get_string() const; - bool get_boolean() const; + int get_int() const; + float get_float() const; + string get_string() const; + string_t get_string_t() const; + bool get_boolean() const; public: void set_int(int val); void set_float(float val); void set_string(const char *s, int len = 0); + void set_empty_string(int len); void set_string_from_other(const Value &other); private: diff --git a/src/observer/sql/executor/load_data_executor.cpp b/src/observer/sql/executor/load_data_executor.cpp index 29ffecd5d..47950e041 100644 --- a/src/observer/sql/executor/load_data_executor.cpp +++ b/src/observer/sql/executor/load_data_executor.cpp @@ -18,6 +18,7 @@ See the Mulan PSL v2 for more details. */ #include "event/sql_event.h" #include "sql/executor/sql_result.h" #include "sql/stmt/load_data_stmt.h" +#include "storage/common/chunk.h" using namespace common; @@ -28,7 +29,7 @@ RC LoadDataExecutor::execute(SQLStageEvent *sql_event) LoadDataStmt *stmt = static_cast(sql_event->stmt()); Table *table = stmt->table(); const char *file_name = stmt->filename(); - load_data(table, file_name, sql_result); + load_data(table, file_name, stmt->terminated(), stmt->enclosed(), sql_result); return rc; } @@ -62,6 +63,10 @@ RC insert_record_from_file( common::strip(file_value); } rc = DataType::type_instance(field->type())->set_value_from_str(record_values[i], file_value); + if (rc != RC::SUCCESS) { + LOG_WARN("Failed to deserialize value from string: %s, type=%d", file_value.c_str(), field->type()); + return rc; + } } if (RC::SUCCESS == rc) { @@ -76,8 +81,11 @@ RC insert_record_from_file( return rc; } -void LoadDataExecutor::load_data(Table *table, const char *file_name, SqlResult *sql_result) + +// TODO: pax format and row format +void LoadDataExecutor::load_data(Table *table, const char *file_name, char terminated, char enclosed, SqlResult *sql_result) { + // your code here stringstream result_string; fstream fs; @@ -99,7 +107,7 @@ void LoadDataExecutor::load_data(Table *table, const char *file_name, SqlResult vector file_values; const string delim("|"); int line_num = 0; - int insertion_count = 0; + [[maybe_unused]]int insertion_count = 0; RC rc = RC::SUCCESS; while (!fs.eof() && RC::SUCCESS == rc) { getline(fs, line); @@ -111,23 +119,30 @@ void LoadDataExecutor::load_data(Table *table, const char *file_name, SqlResult file_values.clear(); common::split_string(line, delim, file_values); stringstream errmsg; - rc = insert_record_from_file(table, file_values, record_values, errmsg); - if (rc != RC::SUCCESS) { - result_string << "Line:" << line_num << " insert record failed:" << errmsg.str() << ". error:" << strrc(rc) - << endl; + + if (table->table_meta().storage_format() == StorageFormat::ROW_FORMAT) { + rc = insert_record_from_file(table, file_values, record_values, errmsg); + if (rc != RC::SUCCESS) { + result_string << "Line:" << line_num << " insert record failed:" << errmsg.str() << ". error:" << strrc(rc) + << endl; + } else { + insertion_count++; + } + } else if (table->table_meta().storage_format() == StorageFormat::PAX_FORMAT) { + // your code here + // Todo: 参照insert_record_from_file实现 + rc = RC::UNIMPLEMENTED; } else { - insertion_count++; + rc = RC::UNSUPPORTED; + result_string << "Unsupported storage format: " << strrc(rc) << endl; } } fs.close(); struct timespec end_time; clock_gettime(CLOCK_MONOTONIC, &end_time); - long cost_nano = (end_time.tv_sec - begin_time.tv_sec) * 1000000000L + (end_time.tv_nsec - begin_time.tv_nsec); if (RC::SUCCESS == rc) { - result_string << strrc(rc) << ". total " << line_num << " line(s) handled and " << insertion_count - << " record(s) loaded, total cost " << cost_nano / 1000000000.0 << " second(s)" << endl; + result_string << strrc(rc); } sql_result->set_return_code(RC::SUCCESS); - sql_result->set_state_string(result_string.str()); } diff --git a/src/observer/sql/executor/load_data_executor.h b/src/observer/sql/executor/load_data_executor.h index 708a6f3ba..42f54d7dc 100644 --- a/src/observer/sql/executor/load_data_executor.h +++ b/src/observer/sql/executor/load_data_executor.h @@ -33,5 +33,5 @@ class LoadDataExecutor RC execute(SQLStageEvent *sql_event); private: - void load_data(Table *table, const char *file_name, SqlResult *sql_result); + void load_data(Table *table, const char *file_name, char terminated, char enclosed, SqlResult *sql_result); }; diff --git a/src/observer/sql/expr/aggregate_hash_table.cpp b/src/observer/sql/expr/aggregate_hash_table.cpp index 7be3d878c..46aaaf993 100644 --- a/src/observer/sql/expr/aggregate_hash_table.cpp +++ b/src/observer/sql/expr/aggregate_hash_table.cpp @@ -9,13 +9,46 @@ MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. See the Mulan PSL v2 for more details. */ #include "sql/expr/aggregate_hash_table.h" +#include "sql/expr/aggregate_state.h" // ----------------------------------StandardAggregateHashTable------------------ RC StandardAggregateHashTable::add_chunk(Chunk &groups_chunk, Chunk &aggrs_chunk) { - // your code here - exit(-1); + if (groups_chunk.rows() != aggrs_chunk.rows()) { + LOG_WARN("groups_chunk and aggrs_chunk have different rows: %d, %d", groups_chunk.rows(), aggrs_chunk.rows()); + return RC::INVALID_ARGUMENT; + } + for (int i = 0; i < groups_chunk.rows(); i++) { + vector group_by_values; + vector aggr_values; + + for (int j = 0; j < groups_chunk.column_num(); j++) { + group_by_values.emplace_back(groups_chunk.get_value(j, i)); + } + + auto it = aggr_values_.find(group_by_values); + if (it == aggr_values_.end()) { + for (size_t j = 0; j < aggr_types_.size(); j++) { + void * state_ptr = create_aggregate_state(aggr_types_[j], aggr_child_types_[j]); + if (state_ptr == nullptr) { + LOG_WARN("create aggregate state failed"); + return RC::INTERNAL; + } + aggr_values.emplace_back(state_ptr); + } + aggr_values_.emplace(group_by_values, aggr_values); + } + auto &aggr = aggr_values_.find(group_by_values)->second; + for (size_t aggr_idx = 0; aggr_idx < aggr.size(); aggr_idx++) { + RC rc = aggregate_state_update_by_value(aggr[aggr_idx], aggr_types_[aggr_idx], aggr_child_types_[aggr_idx], aggrs_chunk.get_value(aggr_idx, i)); + if (rc != RC::SUCCESS) { + LOG_WARN("update aggregate state failed"); + return rc; + } + } + } + return RC::SUCCESS; } void StandardAggregateHashTable::Scanner::open_scan() @@ -26,18 +59,28 @@ void StandardAggregateHashTable::Scanner::open_scan() RC StandardAggregateHashTable::Scanner::next(Chunk &output_chunk) { + RC rc = RC::SUCCESS; if (it_ == end_) { return RC::RECORD_EOF; } - while (it_ != end_ && output_chunk.rows() <= output_chunk.capacity()) { + while (it_ != end_ && output_chunk.rows() < output_chunk.capacity()) { auto &group_by_values = it_->first; auto &aggrs = it_->second; for (int i = 0; i < output_chunk.column_num(); i++) { auto col_idx = output_chunk.column_ids(i); if (col_idx >= static_cast(group_by_values.size())) { - output_chunk.column(i).append_one((char *)aggrs[col_idx - group_by_values.size()].data()); + int aggr_real_idx = col_idx - group_by_values.size(); + rc = finialize_aggregate_state(aggrs[aggr_real_idx], hash_table_->aggr_types_[aggr_real_idx], + hash_table_->aggr_child_types_[aggr_real_idx], output_chunk.column(i)); + if (rc != RC::SUCCESS) { + LOG_WARN("finialize aggregate state failed"); + return rc; + } } else { - output_chunk.column(i).append_one((char *)group_by_values[col_idx].data()); + if (OB_FAIL(output_chunk.column(i).append_value(group_by_values[col_idx]))) { + LOG_WARN("append value failed"); + return rc; + } } } it_++; diff --git a/src/observer/sql/expr/aggregate_hash_table.h b/src/observer/sql/expr/aggregate_hash_table.h index 83e4d4168..d5f484914 100644 --- a/src/observer/sql/expr/aggregate_hash_table.h +++ b/src/observer/sql/expr/aggregate_hash_table.h @@ -47,6 +47,8 @@ class AggregateHashTable virtual RC add_chunk(Chunk &groups_chunk, Chunk &aggrs_chunk) = 0; virtual ~AggregateHashTable() = default; + vector aggr_types_; + vector aggr_child_types_; }; class StandardAggregateHashTable : public AggregateHashTable @@ -63,7 +65,7 @@ class StandardAggregateHashTable : public AggregateHashTable }; public: - using StandardHashTable = unordered_map, vector, VectorHash, VectorEqual>; + using StandardHashTable = unordered_map, vector, VectorHash, VectorEqual>; class Scanner : public AggregateHashTable::Scanner { public: @@ -84,20 +86,26 @@ class StandardAggregateHashTable : public AggregateHashTable ASSERT(expr->type() == ExprType::AGGREGATION, "expect aggregate expression"); auto *aggregation_expr = static_cast(expr); aggr_types_.push_back(aggregation_expr->aggregate_type()); + aggr_child_types_.push_back(aggregation_expr->value_type()); + } + } + virtual ~StandardAggregateHashTable() + { + for (auto &aggr : aggr_values_) { + for (auto &state : aggr.second) { + free(state); + } } } - virtual ~StandardAggregateHashTable() {} RC add_chunk(Chunk &groups_chunk, Chunk &aggrs_chunk) override; StandardHashTable::iterator begin() { return aggr_values_.begin(); } StandardHashTable::iterator end() { return aggr_values_.end(); } -private: /// group by values -> aggregate values StandardHashTable aggr_values_; - vector aggr_types_; }; /** diff --git a/src/observer/sql/expr/aggregate_state.cpp b/src/observer/sql/expr/aggregate_state.cpp index 220253544..78db6e0c9 100644 --- a/src/observer/sql/expr/aggregate_state.cpp +++ b/src/observer/sql/expr/aggregate_state.cpp @@ -9,6 +9,7 @@ MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. See the Mulan PSL v2 for more details. */ #include "sql/expr/aggregate_state.h" +#include #ifdef USE_SIMD #include "common/math/simd_util.h" @@ -29,5 +30,163 @@ void SumState::update(const T *values, int size) #endif } +template +void AvgState::update(const T *values, int size) +{ + for (int i = 0; i < size; ++i) { + value += values[i]; + } + count += size; +} + +template +void CountState::update(const T *values, int size) +{ + value += size; +} + +void* create_aggregate_state(AggregateExpr::Type aggr_type, AttrType attr_type) +{ + void* state_ptr = nullptr; + if (aggr_type == AggregateExpr::Type::SUM) { + if (attr_type == AttrType::INTS) { + state_ptr = malloc(sizeof(SumState)); + new (state_ptr) SumState(); + } else if (attr_type == AttrType::FLOATS) { + state_ptr = malloc(sizeof(SumState)); + new (state_ptr) SumState(); + } else { + LOG_WARN("unsupported aggregate value type"); + } + } else if (aggr_type == AggregateExpr::Type::COUNT) { + state_ptr = malloc(sizeof(CountState)); + new (state_ptr) CountState(); + } else if (aggr_type == AggregateExpr::Type::AVG) { + if (attr_type == AttrType::INTS) { + state_ptr = malloc(sizeof(AvgState)); + new (state_ptr) AvgState(); + } else if (attr_type == AttrType::FLOATS) { + state_ptr = malloc(sizeof(AvgState)); + new (state_ptr) AvgState(); + } else { + LOG_WARN("unsupported aggregate value type"); + } + } else { + LOG_WARN("unsupported aggregator type"); + } + return state_ptr; +} + +RC aggregate_state_update_by_value(void *state, AggregateExpr::Type aggr_type, AttrType attr_type, const Value& val) +{ + RC rc = RC::SUCCESS; + if (aggr_type == AggregateExpr::Type::SUM) { + if (attr_type == AttrType::INTS) { + static_cast*>(state)->update(val.get_int()); + } else if (attr_type == AttrType::FLOATS) { + static_cast*>(state)->update(val.get_float()); + } else { + LOG_WARN("unsupported aggregate value type"); + return RC::UNIMPLEMENTED; + } + } else if (aggr_type == AggregateExpr::Type::COUNT) { + static_cast*>(state)->update(1); + } else if (aggr_type == AggregateExpr::Type::AVG) { + if (attr_type == AttrType::INTS) { + static_cast*>(state)->update(val.get_int()); + } else if (attr_type == AttrType::FLOATS) { + static_cast*>(state)->update(val.get_float()); + } else { + LOG_WARN("unsupported aggregate value type"); + return RC::UNIMPLEMENTED; + } + } else { + LOG_WARN("unsupported aggregator type"); + return RC::UNIMPLEMENTED; + } + return rc; +} + +template +void append_to_column(void *state, Column &column) +{ + STATE *state_ptr = reinterpret_cast(state); + T res = state_ptr->template finalize(); + column.append_one((char *)&res); +} + +RC finialize_aggregate_state(void *state, AggregateExpr::Type aggr_type, AttrType attr_type, Column& col) +{ + RC rc = RC::SUCCESS; + if ( aggr_type == AggregateExpr::Type::SUM) { + if (attr_type == AttrType::INTS) { + append_to_column, int>(state, col); + } else if (attr_type == AttrType::FLOATS) { + append_to_column, float>(state, col); + } else { + rc = RC::UNIMPLEMENTED; + LOG_WARN("unsupported aggregate value type"); + } + } else if (aggr_type == AggregateExpr::Type::COUNT) { + append_to_column, int>(state, col); + } else if (aggr_type == AggregateExpr::Type::AVG) { + if (attr_type == AttrType::INTS) { + append_to_column, float>(state, col); + } else if (attr_type == AttrType::FLOATS) { + append_to_column, float>(state, col); + } else { + rc = RC::UNIMPLEMENTED; + LOG_WARN("unsupported aggregate value type"); + }// + } else { + rc = RC::UNIMPLEMENTED; + LOG_WARN("unsupported aggregator type"); + } + return rc; +} + +template +void update_aggregate_state(void *state, const Column &column) +{ + STATE *state_ptr = reinterpret_cast(state); + T * data = (T *)column.data(); + state_ptr->update(data, column.count()); +} + +RC aggregate_state_update_by_column(void *state, AggregateExpr::Type aggr_type, AttrType attr_type, Column& col) +{ + RC rc = RC::SUCCESS; + if (aggr_type == AggregateExpr::Type::SUM) { + if (attr_type == AttrType::INTS) { + update_aggregate_state, int>(state, col); + } else if (attr_type == AttrType::FLOATS) { + update_aggregate_state, float>(state, col); + } else { + LOG_WARN("unsupported aggregate value type"); + rc = RC::UNIMPLEMENTED; + } + } else if (aggr_type == AggregateExpr::Type::COUNT) { + update_aggregate_state, int>(state, col); + } else if (aggr_type == AggregateExpr::Type::AVG) { + if (attr_type == AttrType::INTS) { + update_aggregate_state, int>(state, col); + } else if (attr_type == AttrType::FLOATS) { + update_aggregate_state, float>(state, col); + } else { + LOG_WARN("unsupported aggregate value type"); + rc = RC::UNIMPLEMENTED; + } + } else { + LOG_WARN("unsupported aggregator type"); + rc = RC::UNIMPLEMENTED; + } + return rc; +} + template class SumState; template class SumState; + +template class CountState; + +template class AvgState; +template class AvgState; diff --git a/src/observer/sql/expr/aggregate_state.h b/src/observer/sql/expr/aggregate_state.h index 785ae3379..86167d467 100644 --- a/src/observer/sql/expr/aggregate_state.h +++ b/src/observer/sql/expr/aggregate_state.h @@ -8,6 +8,8 @@ EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. See the Mulan PSL v2 for more details. */ +#include "sql/expr/expression.h" +#include "common/type/attr_type.h" template class SumState { @@ -15,4 +17,59 @@ class SumState SumState() : value(0) {} T value; void update(const T *values, int size); -}; \ No newline at end of file + void update(const T &value) + { + this->value += value; + } + template + U finalize() + { + return (U)value; + } +}; + +template +class CountState +{ +public: + CountState() : value(0) {} + int value; + void update(const T *values, int size); + void update(const T &value) + { + this->value++; + } + template + U finalize() + { + return (U)value; + } +}; + +template +class AvgState +{ +public: + AvgState() : value(0),count(0) {} + T value; + int count = 0; + void update(const T *values, int size); + void update(const T &value) + { + this->value += value; + this->count++; + } + template + U finalize() + { + return (U)((float)value / (float)count); + } + +}; + +void* create_aggregate_state(AggregateExpr::Type aggr_type, AttrType attr_type); + +RC aggregate_state_update_by_value(void *state, AggregateExpr::Type aggr_type, AttrType attr_type, const Value& val); +RC aggregate_state_update_by_column(void *state, AggregateExpr::Type aggr_type, AttrType attr_type, Column& col); + +RC finialize_aggregate_state(void *state, AggregateExpr::Type aggr_type, AttrType attr_type, Column& col); \ No newline at end of file diff --git a/src/observer/sql/expr/expression.cpp b/src/observer/sql/expr/expression.cpp index b8bf74c55..df5d1c807 100644 --- a/src/observer/sql/expr/expression.cpp +++ b/src/observer/sql/expr/expression.cpp @@ -67,7 +67,7 @@ RC ValueExpr::get_value(const Tuple &tuple, Value &value) const RC ValueExpr::get_column(Chunk &chunk, Column &column) { - column.init(value_); + column.init(value_, chunk.rows()); return RC::SUCCESS; } @@ -99,6 +99,26 @@ RC CastExpr::get_value(const Tuple &tuple, Value &result) const return cast(value, result); } +RC CastExpr::get_column(Chunk &chunk, Column &column) +{ + Column child_column; + RC rc = child_->get_column(chunk, child_column); + if (rc != RC::SUCCESS) { + return rc; + } + column.init(cast_type_, child_column.attr_len()); + for (int i = 0; i < child_column.count(); ++i) { + Value value = child_column.get_value(i); + Value cast_value; + rc = cast(value, cast_value); + if (rc != RC::SUCCESS) { + return rc; + } + column.append_value(cast_value); + } + return rc; +} + RC CastExpr::try_get_value(Value &result) const { Value value; @@ -222,8 +242,26 @@ RC ComparisonExpr::eval(Chunk &chunk, vector &select) rc = compare_column(left_column, right_column, select); } else if (left_column.attr_type() == AttrType::FLOATS) { rc = compare_column(left_column, right_column, select); + } else if (left_column.attr_type() == AttrType::CHARS) { + int rows = 0; + if (left_column.column_type() == Column::Type::CONSTANT_COLUMN) { + rows = right_column.count(); + } else { + rows = left_column.count(); + } + for (int i = 0; i < rows; ++i) { + Value left_val = left_column.get_value(i); + Value right_val = right_column.get_value(i); + bool result = false; + rc = compare_value(left_val, right_val, result); + if (rc != RC::SUCCESS) { + LOG_WARN("failed to compare tuple cells. rc=%s", strrc(rc)); + return rc; + } + select[i] &= result ? 1 : 0; + } + } else { - // TODO: support string compare LOG_WARN("unsupported data type %d", left_column.attr_type()); return RC::INTERNAL; } @@ -308,7 +346,8 @@ AttrType ArithmeticExpr::value_type() const return left_->value_type(); } - if (left_->value_type() == AttrType::INTS && right_->value_type() == AttrType::INTS && + if ((left_->value_type() == AttrType::INTS) && + (right_->value_type() == AttrType::INTS) && arithmetic_type_ != Type::DIV) { return AttrType::INTS; } diff --git a/src/observer/sql/expr/expression.h b/src/observer/sql/expr/expression.h index ec6b6b082..959866d04 100644 --- a/src/observer/sql/expr/expression.h +++ b/src/observer/sql/expr/expression.h @@ -271,6 +271,7 @@ class CastExpr : public Expression ExprType type() const override { return ExprType::CAST; } RC get_value(const Tuple &tuple, Value &value) const override; + RC get_column(Chunk &chunk, Column &column) override; RC try_get_value(Value &value) const override; @@ -413,10 +414,7 @@ class ArithmeticExpr : public Expression AttrType value_type() const override; int value_length() const override { - if (!right_) { - return left_->value_length(); - } - return 4; // sizeof(float) or sizeof(int) + return std::max(left_->value_length(), right_ ? right_->value_length() : 0); }; RC get_value(const Tuple &tuple, Value &value) const override; @@ -493,8 +491,24 @@ class AggregateExpr : public Expression ExprType type() const override { return ExprType::AGGREGATION; } - AttrType value_type() const override { return child_->value_type(); } - int value_length() const override { return child_->value_length(); } + AttrType value_type() const override { + if (aggregate_type_ == Type::COUNT) { + return AttrType::INTS; + } else if (aggregate_type_ == Type::AVG) { + return AttrType::FLOATS; + } else { + return child_->value_type(); + } + } + int value_length() const override { + if (aggregate_type_ == Type::COUNT) { + return sizeof(int); + } else if (aggregate_type_ == Type::AVG) { + return sizeof(float); + } else { + return child_->value_length(); + } + } RC get_value(const Tuple &tuple, Value &value) const override; diff --git a/src/observer/sql/operator/aggregate_vec_physical_operator.cpp b/src/observer/sql/operator/aggregate_vec_physical_operator.cpp index daec1864a..cace563a4 100644 --- a/src/observer/sql/operator/aggregate_vec_physical_operator.cpp +++ b/src/observer/sql/operator/aggregate_vec_physical_operator.cpp @@ -33,22 +33,10 @@ AggregateVecPhysicalOperator::AggregateVecPhysicalOperator(vector auto &expr = aggregate_expressions_[i]; ASSERT(expr->type() == ExprType::AGGREGATION, "expected an aggregation expression"); auto *aggregate_expr = static_cast(expr); - - if (aggregate_expr->aggregate_type() == AggregateExpr::Type::SUM) { - if (aggregate_expr->value_type() == AttrType::INTS) { - void *aggr_value = malloc(sizeof(SumState)); - ((SumState *)aggr_value)->value = 0; - aggr_values_.insert(aggr_value); - output_chunk_.add_column(make_unique(AttrType::INTS, sizeof(int)), i); - } else if (aggregate_expr->value_type() == AttrType::FLOATS) { - void *aggr_value = malloc(sizeof(SumState)); - ((SumState *)aggr_value)->value = 0; - aggr_values_.insert(aggr_value); - output_chunk_.add_column(make_unique(AttrType::FLOATS, sizeof(float)), i); - } - } else { - ASSERT(false, "not supported aggregation type"); - } + void *state_ptr = create_aggregate_state(aggregate_expr->aggregate_type(), aggregate_expr->child()->value_type()); + ASSERT(state_ptr != nullptr, "failed to create aggregate state"); + aggr_values_.insert(state_ptr); + output_chunk_.add_column(make_unique(aggregate_expr->value_type(), aggregate_expr->value_length()), i); } } @@ -69,16 +57,10 @@ RC AggregateVecPhysicalOperator::open(Trx *trx) value_expressions_[aggr_idx]->get_column(chunk_, column); ASSERT(aggregate_expressions_[aggr_idx]->type() == ExprType::AGGREGATION, "expect aggregate expression"); auto *aggregate_expr = static_cast(aggregate_expressions_[aggr_idx]); - if (aggregate_expr->aggregate_type() == AggregateExpr::Type::SUM) { - if (aggregate_expr->value_type() == AttrType::INTS) { - update_aggregate_state, int>(aggr_values_.at(aggr_idx), column); - } else if (aggregate_expr->value_type() == AttrType::FLOATS) { - update_aggregate_state, float>(aggr_values_.at(aggr_idx), column); - } else { - ASSERT(false, "not supported value type"); - } - } else { - ASSERT(false, "not supported aggregation type"); + rc = aggregate_state_update_by_column(aggr_values_.at(aggr_idx), aggregate_expr->aggregate_type(), aggregate_expr->child()->value_type(), column); + if (OB_FAIL(rc)) { + LOG_INFO("failed to update aggregate state. rc=%s", strrc(rc)); + return rc; } } } @@ -89,6 +71,7 @@ RC AggregateVecPhysicalOperator::open(Trx *trx) return rc; } + template void AggregateVecPhysicalOperator::update_aggregate_state(void *state, const Column &column) { @@ -99,8 +82,24 @@ void AggregateVecPhysicalOperator::update_aggregate_state(void *state, const Col RC AggregateVecPhysicalOperator::next(Chunk &chunk) { - // your code here - exit(-1); + if (outputed_) { + return RC::RECORD_EOF; + } + for (size_t i = 0; i < aggr_values_.size(); i++) { + auto pos = i; + ASSERT(aggregate_expressions_[pos]->type() == ExprType::AGGREGATION, "expect aggregation expression"); + auto *aggregate_expr = static_cast(aggregate_expressions_[pos]); + RC rc = finialize_aggregate_state(aggr_values_.at(pos), aggregate_expr->aggregate_type(), aggregate_expr->child()->value_type(), output_chunk_.column(i)); + if (OB_FAIL(rc)) { + LOG_INFO("failed to finialize aggregate state. rc=%s", strrc(rc)); + return rc; + } + } + + chunk.reference(output_chunk_); + outputed_ = true; + + return RC::SUCCESS; } RC AggregateVecPhysicalOperator::close() diff --git a/src/observer/sql/operator/aggregate_vec_physical_operator.h b/src/observer/sql/operator/aggregate_vec_physical_operator.h index 48c6a0e1b..1afab13f2 100644 --- a/src/observer/sql/operator/aggregate_vec_physical_operator.h +++ b/src/observer/sql/operator/aggregate_vec_physical_operator.h @@ -33,13 +33,6 @@ class AggregateVecPhysicalOperator : public PhysicalOperator template void update_aggregate_state(void *state, const Column &column); - template - void append_to_column(void *state, Column &column) - { - STATE *state_ptr = reinterpret_cast(state); - column.append_one((char *)&state_ptr->value); - } - private: class AggregateValues { @@ -71,4 +64,5 @@ class AggregateVecPhysicalOperator : public PhysicalOperator Chunk chunk_; Chunk output_chunk_; AggregateValues aggr_values_; + bool outputed_ = false; }; diff --git a/src/observer/sql/operator/explain_physical_operator.cpp b/src/observer/sql/operator/explain_physical_operator.cpp index 409d7fdf5..1ff2933a3 100644 --- a/src/observer/sql/operator/explain_physical_operator.cpp +++ b/src/observer/sql/operator/explain_physical_operator.cpp @@ -55,7 +55,7 @@ RC ExplainPhysicalOperator::next(Chunk &chunk) Value cell(physical_plan_.c_str()); auto column = make_unique(); - column->init(cell); + column->init(cell, chunk.rows()); chunk.add_column(std::move(column), 0); return RC::SUCCESS; } diff --git a/src/observer/sql/operator/expr_vec_physical_operator.cpp b/src/observer/sql/operator/expr_vec_physical_operator.cpp index 3c871de96..64a3a99bb 100644 --- a/src/observer/sql/operator/expr_vec_physical_operator.cpp +++ b/src/observer/sql/operator/expr_vec_physical_operator.cpp @@ -42,6 +42,7 @@ RC ExprVecPhysicalOperator::next(Chunk &chunk) PhysicalOperator &child = *children_[0]; chunk.reset(); evaled_chunk_.reset(); + chunk_.reset(); if (OB_SUCC(rc = child.next(chunk_))) { for (size_t i = 0; i < expressions_.size(); i++) { auto column = make_unique(); diff --git a/src/observer/sql/operator/project_vec_physical_operator.cpp b/src/observer/sql/operator/project_vec_physical_operator.cpp index 6decf2c8b..06a487cfb 100644 --- a/src/observer/sql/operator/project_vec_physical_operator.cpp +++ b/src/observer/sql/operator/project_vec_physical_operator.cpp @@ -43,7 +43,7 @@ RC ProjectVecPhysicalOperator::next(Chunk &chunk) if (children_.empty()) { return RC::RECORD_EOF; } - chunk_.reset_data(); + chunk_.reset(); RC rc = children_[0]->next(chunk_); if (rc == RC::RECORD_EOF) { return rc; diff --git a/src/observer/sql/operator/table_scan_vec_physical_operator.cpp b/src/observer/sql/operator/table_scan_vec_physical_operator.cpp index 9f7ace99f..9744776cd 100644 --- a/src/observer/sql/operator/table_scan_vec_physical_operator.cpp +++ b/src/observer/sql/operator/table_scan_vec_physical_operator.cpp @@ -53,8 +53,8 @@ RC TableScanVecPhysicalOperator::next(Chunk &chunk) continue; } for (int j = 0; j < all_columns_.column_num(); j++) { - filterd_columns_.column(j).append_one( - (char *)all_columns_.column(filterd_columns_.column_ids(j)).get_value(i).data()); + filterd_columns_.column(j).append_value( + all_columns_.column(filterd_columns_.column_ids(j)).get_value(i)); } } chunk.reference(filterd_columns_); diff --git a/src/observer/sql/optimizer/logical_plan_generator.cpp b/src/observer/sql/optimizer/logical_plan_generator.cpp index 107b0d441..f9da53f3a 100644 --- a/src/observer/sql/optimizer/logical_plan_generator.cpp +++ b/src/observer/sql/optimizer/logical_plan_generator.cpp @@ -138,12 +138,14 @@ RC LogicalPlanGenerator::create_plan(SelectStmt *select_stmt, unique_ptr(std::move(select_stmt->query_expressions())); + unique_ptr project_oper = make_unique(std::move(select_stmt->query_expressions())); if (*last_oper) { project_oper->add_child(std::move(*last_oper)); } - logical_operator = std::move(project_oper); + last_oper = &project_oper; + + logical_operator = std::move(*last_oper); return RC::SUCCESS; } diff --git a/src/observer/sql/optimizer/physical_plan_generator.cpp b/src/observer/sql/optimizer/physical_plan_generator.cpp index 2c8d9a345..4ce64a24b 100644 --- a/src/observer/sql/optimizer/physical_plan_generator.cpp +++ b/src/observer/sql/optimizer/physical_plan_generator.cpp @@ -113,14 +113,13 @@ RC PhysicalPlanGenerator::create_vec(LogicalOperator &logical_operator, unique_p return create_vec_plan(static_cast(logical_operator), oper, session); } break; default: { + LOG_WARN("unknown logical operator type: %d", logical_operator.type()); return RC::INVALID_ARGUMENT; } } return rc; } - - RC PhysicalPlanGenerator::create_plan(TableGetLogicalOperator &table_get_oper, unique_ptr &oper, Session* session) { vector> &predicates = table_get_oper.predicates(); @@ -133,7 +132,7 @@ RC PhysicalPlanGenerator::create_plan(TableGetLogicalOperator &table_get_oper, u if (expr->type() == ExprType::COMPARISON) { auto comparison_expr = static_cast(expr.get()); // 简单处理,就找等值查询 - if (comparison_expr->comp() != EQUAL_TO) { + if (comparison_expr->comp() != EQUAL_TO && comparison_expr->comp() != NOT_EQUAL) { continue; } diff --git a/src/observer/sql/optimizer/predicate_pushdown_rewriter.cpp b/src/observer/sql/optimizer/predicate_pushdown_rewriter.cpp index 581ac6113..f67cc551d 100644 --- a/src/observer/sql/optimizer/predicate_pushdown_rewriter.cpp +++ b/src/observer/sql/optimizer/predicate_pushdown_rewriter.cpp @@ -119,18 +119,6 @@ RC PredicatePushdownRewriter::get_exprs_can_pushdown( } } else if (expr->type() == ExprType::COMPARISON) { // 如果是比较操作,并且比较的左边或右边是表某个列值,那么就下推下去 - auto comparison_expr = static_cast(expr.get()); - - unique_ptr &left_expr = comparison_expr->left(); - unique_ptr &right_expr = comparison_expr->right(); - // 比较操作的左右两边只要有一个是取列字段值的并且另一边也是取字段值或常量,就pushdown - if (left_expr->type() != ExprType::FIELD && right_expr->type() != ExprType::FIELD) { - return rc; - } - if (left_expr->type() != ExprType::FIELD && left_expr->type() != ExprType::VALUE && - right_expr->type() != ExprType::FIELD && right_expr->type() != ExprType::VALUE) { - return rc; - } pushdown_exprs.emplace_back(std::move(expr)); } diff --git a/src/observer/sql/parser/expression_binder.cpp b/src/observer/sql/parser/expression_binder.cpp index 9000756e4..684ecf468 100644 --- a/src/observer/sql/parser/expression_binder.cpp +++ b/src/observer/sql/parser/expression_binder.cpp @@ -370,7 +370,7 @@ RC check_aggregate_expression(AggregateExpr &expression) case AggregateExpr::Type::SUM: case AggregateExpr::Type::AVG: { // 仅支持数值类型 - if (child_value_type != AttrType::INTS && child_value_type != AttrType::FLOATS) { + if (!is_numerical_type(child_value_type)) { LOG_WARN("invalid child value type for aggregate expression: %d", static_cast(child_value_type)); return RC::INVALID_ARGUMENT; } diff --git a/src/observer/sql/parser/lex_sql.l b/src/observer/sql/parser/lex_sql.l index 2b9258ba8..42f7a89f3 100644 --- a/src/observer/sql/parser/lex_sql.l +++ b/src/observer/sql/parser/lex_sql.l @@ -123,6 +123,9 @@ FORMAT RETURN_TOKEN(FORMAT); PRIMARY RETURN_TOKEN(PRIMARY); KEY RETURN_TOKEN(KEY); ANALYZE RETURN_TOKEN(ANALYZE); +FIELDS RETURN_TOKEN(FIELDS); +TERMINATED RETURN_TOKEN(TERMINATED); +ENCLOSED RETURN_TOKEN(ENCLOSED); {ID} yylval->cstring=strdup(yytext); static_cast*>(yyextra)->push_back(yylval->cstring); RETURN_TOKEN(ID); "(" RETURN_TOKEN(LBRACE); ")" RETURN_TOKEN(RBRACE); diff --git a/src/observer/sql/parser/parse_defs.h b/src/observer/sql/parser/parse_defs.h index 4a01b64f6..8880432b3 100644 --- a/src/observer/sql/parser/parse_defs.h +++ b/src/observer/sql/parser/parse_defs.h @@ -223,6 +223,8 @@ struct LoadDataSqlNode { string relation_name; string file_name; + string terminated = ","; + string enclosed = "\""; }; /** diff --git a/src/observer/sql/parser/yacc_sql.y b/src/observer/sql/parser/yacc_sql.y index 83a909bb3..0d14fa383 100644 --- a/src/observer/sql/parser/yacc_sql.y +++ b/src/observer/sql/parser/yacc_sql.y @@ -108,6 +108,9 @@ UnboundAggregateExpr *create_aggregate_expression(const char *aggregate_name, PRIMARY KEY ANALYZE + FIELDS + TERMINATED + ENCLOSED EQ LT GT @@ -160,8 +163,11 @@ UnboundAggregateExpr *create_aggregate_expression(const char *aggregate_name, %type attr_list %type rel_list %type expression +%type aggregate_expression %type expression_list %type group_by +%type fields_terminated_by +%type enclosed_by %type calc_stmt %type select_stmt %type insert_stmt @@ -549,6 +555,9 @@ expression: | '-' expression %prec UMINUS { $$ = create_arithmetic_expression(ArithmeticExpr::Type::NEGATIVE, $2, nullptr, sql_string, &@$); } + | '*' { + $$ = new StarExpr(); + } | value { $$ = new ValueExpr(*$1); $$->set_name(token_name(sql_string, &@$)); @@ -560,10 +569,15 @@ expression: $$->set_name(token_name(sql_string, &@$)); delete $1; } - | '*' { - $$ = new StarExpr(); + | aggregate_expression { + $$ = $1; + } + ; + +aggregate_expression: + ID LBRACE expression RBRACE { + $$ = create_aggregate_expression($1, $3, sql_string, &@$); } - // your code here ; rel_attr: @@ -690,19 +704,55 @@ group_by: { $$ = nullptr; } + | GROUP BY expression_list + { + // group by 的表达式范围与select查询值的表达式范围是不同的,比如group by不支持 * + // 但是这里没有处理。 + $$ = $3; + } ; load_data_stmt: - LOAD DATA INFILE SSS INTO TABLE ID + LOAD DATA INFILE SSS INTO TABLE ID fields_terminated_by enclosed_by { char *tmp_file_name = common::substr($4, 1, strlen($4) - 2); $$ = new ParsedSqlNode(SCF_LOAD_DATA); $$->load_data.relation_name = $7; $$->load_data.file_name = tmp_file_name; + if ($8 != nullptr) { + char *tmp = common::substr($8,1,strlen($8)-2); + $$->load_data.terminated = $8; + free(tmp); + } + if ($9 != nullptr) { + char *tmp = common::substr($9,1,strlen($9)-2); + $$->load_data.enclosed = $9; + free(tmp); + } free(tmp_file_name); } ; +fields_terminated_by: + /* empty */ + { + $$ = nullptr; + } + | FIELDS TERMINATED BY SSS + { + $$ = $4; + }; + +enclosed_by: + /* empty */ + { + $$ = nullptr; + } + | ENCLOSED BY SSS + { + $$ = $3; + }; + explain_stmt: EXPLAIN command_wrapper { diff --git a/src/observer/sql/stmt/load_data_stmt.cpp b/src/observer/sql/stmt/load_data_stmt.cpp index 530faa42a..37335dfa7 100644 --- a/src/observer/sql/stmt/load_data_stmt.cpp +++ b/src/observer/sql/stmt/load_data_stmt.cpp @@ -43,6 +43,15 @@ RC LoadDataStmt::create(Db *db, const LoadDataSqlNode &load_data, Stmt *&stmt) return RC::FILE_NOT_EXIST; } - stmt = new LoadDataStmt(table, load_data.file_name.c_str()); + if (load_data.enclosed.size() != 3) { + LOG_WARN("load data invalid enclosed. enclosed=%s", load_data.enclosed.c_str()); + return RC::INVALID_ARGUMENT; + } + if (load_data.terminated.size() != 3) { + LOG_WARN("load data invalid terminated. terminated=%s", load_data.terminated.c_str()); + return RC::INVALID_ARGUMENT; + } + + stmt = new LoadDataStmt(table, load_data.file_name.c_str(), load_data.terminated[1], load_data.enclosed[1]); return rc; } diff --git a/src/observer/sql/stmt/load_data_stmt.h b/src/observer/sql/stmt/load_data_stmt.h index 1308159b7..0c0b7e928 100644 --- a/src/observer/sql/stmt/load_data_stmt.h +++ b/src/observer/sql/stmt/load_data_stmt.h @@ -21,17 +21,23 @@ class Table; class LoadDataStmt : public Stmt { public: - LoadDataStmt(Table *table, const char *filename) : table_(table), filename_(filename) {} + LoadDataStmt(Table *table, const char *filename, + const char terminated, const char enclosed) + : table_(table), filename_(filename), terminated_(terminated), enclosed_(enclosed) {} virtual ~LoadDataStmt() = default; StmtType type() const override { return StmtType::LOAD_DATA; } Table *table() const { return table_; } const char *filename() const { return filename_.c_str(); } + const char terminated() const { return terminated_; } + const char enclosed() const { return enclosed_; } static RC create(Db *db, const LoadDataSqlNode &load_data, Stmt *&stmt); private: Table *table_ = nullptr; string filename_; + char terminated_; + char enclosed_; }; diff --git a/src/observer/storage/buffer/disk_buffer_pool.h b/src/observer/storage/buffer/disk_buffer_pool.h index d41d1bd0c..4f6ee8f7a 100644 --- a/src/observer/storage/buffer/disk_buffer_pool.h +++ b/src/observer/storage/buffer/disk_buffer_pool.h @@ -71,8 +71,7 @@ struct BPFileHeader /** * 能够分配的最大的页面个数,即bitmap的字节数 乘以8 */ - static const int MAX_PAGE_NUM = - (BP_PAGE_DATA_SIZE - sizeof(buffer_pool_id) - sizeof(page_count) - sizeof(allocated_pages)) * 8; + static const int MAX_PAGE_NUM = (BP_PAGE_DATA_SIZE - sizeof(buffer_pool_id) - sizeof(page_count) - sizeof(allocated_pages)) * 8; string to_string() const; }; diff --git a/src/observer/storage/common/arena_allocator.cpp b/src/observer/storage/common/arena_allocator.cpp new file mode 100644 index 000000000..342392a00 --- /dev/null +++ b/src/observer/storage/common/arena_allocator.cpp @@ -0,0 +1,72 @@ +/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. +miniob is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. */ + +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "storage/common/arena_allocator.h" + +static const int kBlockSize = 4096; + +Arena::Arena() + : alloc_ptr_(nullptr), alloc_bytes_remaining_(0), memory_usage_(0) {} + +Arena::~Arena() { + for (size_t i = 0; i < blocks_.size(); i++) { + delete[] blocks_[i]; + } +} + +char* Arena::AllocateFallback(size_t bytes) { + if (bytes > kBlockSize / 4) { + // Object is more than a quarter of our block size. Allocate it separately + // to avoid wasting too much space in leftover bytes. + char* result = AllocateNewBlock(bytes); + return result; + } + + // We waste the remaining space in the current block. + alloc_ptr_ = AllocateNewBlock(kBlockSize); + alloc_bytes_remaining_ = kBlockSize; + + char* result = alloc_ptr_; + alloc_ptr_ += bytes; + alloc_bytes_remaining_ -= bytes; + return result; +} + +char* Arena::AllocateAligned(size_t bytes) { + const int align = (sizeof(void*) > 8) ? sizeof(void*) : 8; + static_assert((align & (align - 1)) == 0, + "Pointer size should be a power of 2"); + size_t current_mod = reinterpret_cast(alloc_ptr_) & (align - 1); + size_t slop = (current_mod == 0 ? 0 : align - current_mod); + size_t needed = bytes + slop; + char* result; + if (needed <= alloc_bytes_remaining_) { + result = alloc_ptr_ + slop; + alloc_ptr_ += needed; + alloc_bytes_remaining_ -= needed; + } else { + // AllocateFallback always returned aligned memory + result = AllocateFallback(bytes); + } + assert((reinterpret_cast(result) & (align - 1)) == 0); + return result; +} + +char* Arena::AllocateNewBlock(size_t block_bytes) { + char* result = new char[block_bytes]; + blocks_.push_back(result); + memory_usage_.fetch_add(block_bytes + sizeof(char*), + std::memory_order_relaxed); + return result; +} diff --git a/src/observer/storage/common/arena_allocator.h b/src/observer/storage/common/arena_allocator.h new file mode 100644 index 000000000..b93be4c52 --- /dev/null +++ b/src/observer/storage/common/arena_allocator.h @@ -0,0 +1,74 @@ +/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. +miniob is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. */ + +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once + +#include +#include +#include +#include +#include + +class Arena { + public: + Arena(); + + Arena(const Arena&) = delete; + Arena& operator=(const Arena&) = delete; + + ~Arena(); + + // Return a pointer to a newly allocated memory block of "bytes" bytes. + char* Allocate(size_t bytes); + + // Allocate memory with the normal alignment guarantees provided by malloc. + char* AllocateAligned(size_t bytes); + + // Returns an estimate of the total memory usage of data allocated + // by the arena. + size_t MemoryUsage() const { + return memory_usage_.load(std::memory_order_relaxed); + } + + private: + char* AllocateFallback(size_t bytes); + char* AllocateNewBlock(size_t block_bytes); + + // Allocation state + char* alloc_ptr_; + size_t alloc_bytes_remaining_; + + // Array of new[] allocated memory blocks + std::vector blocks_; + + // Total memory usage of the arena. + // + // TODO(costan): This member is accessed via atomics, but the others are + // accessed without any locking. Is this OK? + std::atomic memory_usage_; +}; + +inline char* Arena::Allocate(size_t bytes) { + // The semantics of what to return are a bit messy if we allow + // 0-byte allocations, so we disallow them here (we don't need + // them for our internal use). + assert(bytes > 0); + if (bytes <= alloc_bytes_remaining_) { + char* result = alloc_ptr_; + alloc_ptr_ += bytes; + alloc_bytes_remaining_ -= bytes; + return result; + } + return AllocateFallback(bytes); +} diff --git a/src/observer/storage/common/chunk.h b/src/observer/storage/common/chunk.h index b32e514c1..32a4ec11e 100644 --- a/src/observer/storage/common/chunk.h +++ b/src/observer/storage/common/chunk.h @@ -22,9 +22,20 @@ See the Mulan PSL v2 for more details. */ class Chunk { public: + static const int MAX_ROWS = Column::DEFAULT_CAPACITY; Chunk() = default; - Chunk(const Chunk &) = delete; - Chunk(Chunk &&) = delete; + Chunk(const Chunk &other) + { + for (size_t i = 0; i < other.columns_.size(); ++i) { + columns_.emplace_back(other.columns_[i]->clone()); + } + column_ids_ = other.column_ids_; + } + Chunk(Chunk && chunk) + { + columns_ = std::move(chunk.columns_); + column_ids_ = std::move(chunk.column_ids_); + } int column_num() const { return columns_.size(); } @@ -34,6 +45,12 @@ class Chunk return *columns_[idx]; } + const Column &column(size_t idx) const + { + ASSERT(idx < columns_.size(), "invalid column index"); + return *columns_[idx]; + } + Column *column_ptr(size_t idx) { ASSERT(idx < columns_.size(), "invalid column index"); diff --git a/src/observer/storage/common/column.cpp b/src/observer/storage/common/column.cpp index 19dc12f3d..a81cf17f7 100644 --- a/src/observer/storage/common/column.cpp +++ b/src/observer/storage/common/column.cpp @@ -22,6 +22,7 @@ Column::Column(const FieldMeta &meta, size_t size) { // TODO: optimized the memory usage if it doesn't need to allocate memory data_ = new char[size * attr_len_]; + memset(data_, 0, size * attr_len_); capacity_ = size; } @@ -30,6 +31,7 @@ Column::Column(AttrType attr_type, int attr_len, size_t capacity) attr_type_ = attr_type; attr_len_ = attr_len; data_ = new char[capacity * attr_len_]; + memset(data_, 0, capacity * attr_len_); count_ = 0; capacity_ = capacity; own_ = true; @@ -40,6 +42,7 @@ void Column::init(const FieldMeta &meta, size_t size) { reset(); data_ = new char[size * meta.len()]; + memset(data_, 0, size * meta.len()); count_ = 0; capacity_ = size; attr_type_ = meta.type(); @@ -52,6 +55,7 @@ void Column::init(AttrType attr_type, int attr_len, size_t capacity) { reset(); data_ = new char[capacity * attr_len]; + memset(data_, 0, capacity * attr_len); count_ = 0; capacity_ = capacity; own_ = true; @@ -60,13 +64,18 @@ void Column::init(AttrType attr_type, int attr_len, size_t capacity) column_type_ = Type::NORMAL_COLUMN; } -void Column::init(const Value &value) +void Column::init(const Value &value, size_t size) { reset(); attr_type_ = value.attr_type(); attr_len_ = value.length(); - data_ = new char[attr_len_]; - count_ = 1; + if (attr_len_ == 0) { + data_ = new char[1]; + data_[0] = '\0'; + } else { + data_ = new char[attr_len_]; + } + count_ = size; capacity_ = 1; own_ = true; memcpy(data_, value.data(), attr_len_); @@ -75,6 +84,9 @@ void Column::init(const Value &value) void Column::reset() { + if (vector_buffer_ != nullptr) { + vector_buffer_ = nullptr; + } if (data_ != nullptr && own_) { delete[] data_; } @@ -86,9 +98,9 @@ void Column::reset() attr_len_ = -1; } -RC Column::append_one(char *data) { return append(data, 1); } +RC Column::append_one(const char *data) { return append(data, 1); } -RC Column::append(char *data, int count) +RC Column::append(const char *data, int count) { if (!own_) { LOG_WARN("append data to non-owned column"); @@ -106,8 +118,37 @@ RC Column::append(char *data, int count) return RC::SUCCESS; } +RC Column::append_value(const Value &value) +{ + if (!own_) { + LOG_WARN("append data to non-owned column"); + return RC::INTERNAL; + } + if (count_ >= capacity_) { + LOG_WARN("append data to full column"); + return RC::INTERNAL; + } + + size_t total_bytes = std::min(value.length(), attr_len_); + memcpy(data_ + count_ * attr_len_, value.data(), total_bytes); + + count_ += 1; + return RC::SUCCESS; +} + +string_t Column::add_text(const char *data, int length) +{ + if (vector_buffer_ == nullptr) { + vector_buffer_ = make_unique(); + } + return vector_buffer_->add_string(data, length); +} + Value Column::get_value(int index) const { + if (column_type_ == Type::CONSTANT_COLUMN) { + index = 0; + } if (index >= count_ || index < 0) { return Value(); } diff --git a/src/observer/storage/common/column.h b/src/observer/storage/common/column.h index 122fb854f..10782ec68 100644 --- a/src/observer/storage/common/column.h +++ b/src/observer/storage/common/column.h @@ -13,6 +13,7 @@ See the Mulan PSL v2 for more details. */ #include #include "storage/field/field_meta.h" +#include "storage/common/vector_buffer.h" /** * @brief A column contains multiple values in contiguous memory with a specified type. @@ -28,34 +29,74 @@ class Column }; Column() = default; - Column(const Column &) = delete; - Column(Column &&) = delete; + + Column(const Column &other) + { + count_ = other.count_; + capacity_ = other.capacity_; + own_ = true; + attr_type_ = other.attr_type_; + attr_len_ = other.attr_len_; + column_type_ = other.column_type_; + data_ = new char[capacity_ * attr_len_]; + memcpy(data_, other.data_, capacity_ * attr_len_); + vector_buffer_ = make_unique(); + + } + Column(Column && other) + { + data_ = other.data_; + count_ = other.count_; + capacity_ = other.capacity_; + own_ = other.own_; + attr_type_ = other.attr_type_; + attr_len_ = other.attr_len_; + column_type_ = other.column_type_; + vector_buffer_ = std::move(other.vector_buffer_); + other.data_ = nullptr; + other.count_ = 0; + other.capacity_ = 0; + other.own_ = false; + } Column(const FieldMeta &meta, size_t size = DEFAULT_CAPACITY); Column(AttrType attr_type, int attr_len, size_t size = DEFAULT_CAPACITY); void init(const FieldMeta &meta, size_t size = DEFAULT_CAPACITY); void init(AttrType attr_type, int attr_len, size_t size = DEFAULT_CAPACITY); - void init(const Value &value); + void init(const Value &value, size_t size); + + unique_ptr clone() const + { + return make_unique(*this); + } virtual ~Column() { reset(); } void reset(); - RC append_one(char *data); + RC append_one(const char *data); + + RC append_value(const Value& val); /** * @brief 向 Column 追加写入数据 * @param data 要被写入数据的起始地址 * @param count 要写入数据的长度(这里指列值的个数,而不是字节) */ - RC append(char *data, int count); + RC append(const char *data, int count); /** * @brief 获取 index 位置的列值 */ Value get_value(int index) const; + RC copy_to(void* dest, int start_rows, int insert_rows) const + { + memcpy(dest, data_ + start_rows * attr_len_, insert_rows * attr_len_); + return RC::SUCCESS; + } + /** * @brief 获取列数据的实际大小(字节) */ @@ -63,10 +104,16 @@ class Column char *data() const { return data_; } + string_t add_text(const char* str, int len); + /** * @brief 重置列数据,但不修改元信息 */ - void reset_data() { count_ = 0; } + void reset_data() + { + count_ = 0; + vector_buffer_ = nullptr; + } /** * @brief 引用另一个 Column @@ -74,6 +121,7 @@ class Column void reference(const Column &column); void set_column_type(Type column_type) { column_type_ = column_type; } + void set_attr_type(AttrType attr_type) { attr_type_ = attr_type; } void set_count(int count) { count_ = count; } int count() const { return count_; } @@ -81,9 +129,8 @@ class Column AttrType attr_type() const { return attr_type_; } int attr_len() const { return attr_len_; } Type column_type() const { return column_type_; } - -private: static constexpr size_t DEFAULT_CAPACITY = 8192; +private: char *data_ = nullptr; /// 当前列值数量 @@ -98,4 +145,5 @@ class Column int attr_len_ = -1; /// 列类型 Type column_type_ = Type::NORMAL_COLUMN; + unique_ptr vector_buffer_ = nullptr; }; \ No newline at end of file diff --git a/src/observer/storage/common/meta_util.cpp b/src/observer/storage/common/meta_util.cpp index 267d0adf4..e11842d9a 100644 --- a/src/observer/storage/common/meta_util.cpp +++ b/src/observer/storage/common/meta_util.cpp @@ -34,3 +34,8 @@ string table_index_file(const char *base_dir, const char *table_name, const char { return filesystem::path(base_dir) / (string(table_name) + "-" + index_name + TABLE_INDEX_SUFFIX); } + +string table_lob_file(const char *base_dir, const char *table_name) +{ + return filesystem::path(base_dir) / (string(table_name) + TABLE_LOB_SUFFIX); +} \ No newline at end of file diff --git a/src/observer/storage/common/meta_util.h b/src/observer/storage/common/meta_util.h index 2eaacd619..0a299a3e1 100644 --- a/src/observer/storage/common/meta_util.h +++ b/src/observer/storage/common/meta_util.h @@ -20,8 +20,10 @@ static constexpr const char *TABLE_META_SUFFIX = ".table"; static constexpr const char *TABLE_META_FILE_PATTERN = ".*\\.table$"; static constexpr const char *TABLE_DATA_SUFFIX = ".data"; static constexpr const char *TABLE_INDEX_SUFFIX = ".index"; +static constexpr const char *TABLE_LOB_SUFFIX = ".lob"; string db_meta_file(const char *base_dir, const char *db_name); string table_meta_file(const char *base_dir, const char *table_name); string table_data_file(const char *base_dir, const char *table_name); string table_index_file(const char *base_dir, const char *table_name, const char *index_name); +string table_lob_file(const char *base_dir, const char *table_name); diff --git a/src/observer/storage/common/vector_buffer.h b/src/observer/storage/common/vector_buffer.h new file mode 100644 index 000000000..bbf7498ec --- /dev/null +++ b/src/observer/storage/common/vector_buffer.h @@ -0,0 +1,33 @@ +#pragma once + +#include "storage/common/arena_allocator.h" +#include "common/type/string_t.h" +#include "common/log/log.h" + +class VectorBuffer { +public: + VectorBuffer() = default; + + string_t add_string(const char *data, int len) { + if (len <= string_t::INLINE_LENGTH) { + return string_t(data, len); + } + auto insert_string = empty_string(len); + auto insert_pos = insert_string.get_data_writeable(); + memcpy(insert_pos, data, len); + return insert_string; + } + + string_t add_string(string_t data) { + return add_string(data.data(), data.size()); + } + + string_t empty_string(int len) { + ASSERT(len > string_t::INLINE_LENGTH, "len > string_t::INLINE_LENGTH"); + auto insert_pos = heap.Allocate(len); + return string_t(insert_pos, len); + } + +private: + Arena heap; +}; \ No newline at end of file diff --git a/src/observer/storage/persist/persist.cpp b/src/observer/storage/persist/persist.cpp index d819647a3..866a95878 100644 --- a/src/observer/storage/persist/persist.cpp +++ b/src/observer/storage/persist/persist.cpp @@ -39,7 +39,7 @@ RC PersistHandler::create_file(const char *file_name) rc = RC::FILE_CREATE; } else { file_name_ = file_name; - close(fd); + file_desc_ = fd; LOG_INFO("Successfully create %s.", file_name); } } @@ -178,7 +178,7 @@ RC PersistHandler::write_at(uint64_t offset, int size, const char *data, int64_t return rc; } -RC PersistHandler::append(int size, const char *data, int64_t *out_size) +RC PersistHandler::append(int size, const char *data, int64_t *out_size, int64_t *out_offset) { RC rc = RC::SUCCESS; if (file_name_.empty()) { @@ -188,7 +188,8 @@ RC PersistHandler::append(int size, const char *data, int64_t *out_size) LOG_ERROR("Failed to append, because file is not opened."); rc = RC::FILE_NOT_OPENED; } else { - if (lseek(file_desc_, 0, SEEK_END) == off_t(-1)) { + off_t end_offset = lseek(file_desc_, 0, SEEK_END); + if (end_offset == off_t(-1)) { LOG_ERROR("Failed to append file %d:%s due to failed to seek: %s.", file_desc_, file_name_.c_str(), strerror(errno)); rc = RC::IOERR_SEEK; @@ -202,6 +203,9 @@ RC PersistHandler::append(int size, const char *data, int64_t *out_size) if (out_size != nullptr) { *out_size = write_size; } + if (out_offset != nullptr) { + *out_offset = static_cast(end_offset); + } } } diff --git a/src/observer/storage/persist/persist.h b/src/observer/storage/persist/persist.h index 0dde544af..45f61f7dc 100644 --- a/src/observer/storage/persist/persist.h +++ b/src/observer/storage/persist/persist.h @@ -49,7 +49,7 @@ class PersistHandler RC write_at(uint64_t offset, int size, const char *data, int64_t *out_size = nullptr); /** 在文件末尾写入一段数据,并返回实际写入的数据大小out_size */ - RC append(int size, const char *data, int64_t *out_size = nullptr); + RC append(int size, const char *data, int64_t *out_size = nullptr, int64_t *out_offset = nullptr); /** 在当前文件描述符的位置读取一段数据,并返回实际读取的数据大小out_size */ RC read_file(int size, char *data, int64_t *out_size = nullptr); diff --git a/src/observer/storage/record/lob_handler.cpp b/src/observer/storage/record/lob_handler.cpp new file mode 100644 index 000000000..b7905170e --- /dev/null +++ b/src/observer/storage/record/lob_handler.cpp @@ -0,0 +1,44 @@ +/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. +miniob is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. */ + +#include "storage/record/lob_handler.h" + +RC LobFileHandler::create_file(const char *file_name) +{ + return file_.create_file(file_name); +} + +RC LobFileHandler::open_file(const char *file_name) +{ + std::ifstream file(file_name); + if (file.good()) { + return file_.open_file(file_name); + } else { + return RC::FILE_NOT_EXIST; + } + return RC::INTERNAL; +} + +RC LobFileHandler::insert_data(int64_t &offset, int64_t length, const char *data) +{ + RC rc = RC::SUCCESS; + int64_t out_size = 0; + int64_t end_offset = 0; + rc = file_.append(length, data, &out_size, &end_offset); + if (OB_FAIL(rc)) { + return rc; + } + if (out_size != length) { + return RC::IOERR_WRITE; + } + offset = end_offset; + + return rc; +} diff --git a/src/observer/storage/record/lob_handler.h b/src/observer/storage/record/lob_handler.h new file mode 100644 index 000000000..5db6b17b2 --- /dev/null +++ b/src/observer/storage/record/lob_handler.h @@ -0,0 +1,41 @@ +/* Copyright (c) 2021 OceanBase and/or its affiliates. All rights reserved. +miniob is licensed under Mulan PSL v2. +You can use this software according to the terms and conditions of the Mulan PSL v2. +You may obtain a copy of Mulan PSL v2 at: + http://license.coscl.org.cn/MulanPSL2 +THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +See the Mulan PSL v2 for more details. */ + +#pragma once + +#include "common/lang/mutex.h" +#include "common/lang/sstream.h" +#include "common/types.h" +#include "storage/persist/persist.h" + +/** + * @brief 管理LOB文件中的 LOB 对象 + * @ingroup RecordManager + */ +class LobFileHandler +{ +public: + LobFileHandler() {} + + ~LobFileHandler() { close_file(); } + + RC create_file(const char *file_name); + + RC open_file(const char *file_name); + + RC close_file() { return file_.close_file(); } + + RC insert_data(int64_t &offset, int64_t length, const char *data); + + RC get_data(int64_t offset, int64_t length, char *data) { return file_.read_at(offset, length, data); } + +private: + PersistHandler file_; +}; \ No newline at end of file diff --git a/src/observer/storage/record/record_manager.cpp b/src/observer/storage/record/record_manager.cpp index cd8a69149..87e5a6105 100644 --- a/src/observer/storage/record/record_manager.cpp +++ b/src/observer/storage/record/record_manager.cpp @@ -95,7 +95,8 @@ RC RecordPageIterator::next(Record &record) RecordPageHandler::~RecordPageHandler() { cleanup(); } -RC RecordPageHandler::init(DiskBufferPool &buffer_pool, LogHandler &log_handler, PageNum page_num, ReadWriteMode mode) +RC RecordPageHandler::init(DiskBufferPool &buffer_pool, LogHandler &log_handler, PageNum page_num, ReadWriteMode mode, + LobFileHandler *lob_handler) { if (disk_buffer_pool_ != nullptr) { if (frame_->page_num() == page_num) { @@ -105,6 +106,7 @@ RC RecordPageHandler::init(DiskBufferPool &buffer_pool, LogHandler &log_handler, cleanup(); } } + lob_handler_ = lob_handler; RC ret = RC::SUCCESS; if ((ret = buffer_pool.get_this_page(page_num, &frame_)) != RC::SUCCESS) { @@ -158,10 +160,11 @@ RC RecordPageHandler::recover_init(DiskBufferPool &buffer_pool, PageNum page_num return ret; } -RC RecordPageHandler::init_empty_page( - DiskBufferPool &buffer_pool, LogHandler &log_handler, PageNum page_num, int record_size, TableMeta *table_meta) +RC RecordPageHandler::init_empty_page(DiskBufferPool &buffer_pool, LogHandler &log_handler, PageNum page_num, + int record_size, TableMeta *table_meta, LobFileHandler *lob_handler) { - RC rc = init(buffer_pool, log_handler, page_num, ReadWriteMode::READ_WRITE); + RC rc = init(buffer_pool, log_handler, page_num, ReadWriteMode::READ_WRITE); + lob_handler_ = lob_handler; if (OB_FAIL(rc)) { LOG_ERROR("Failed to init empty page page_num:record_size %d:%d. rc=%s", page_num, record_size, strrc(rc)); return rc; @@ -191,6 +194,8 @@ RC RecordPageHandler::init_empty_page( bitmap_ = frame_->data() + PAGE_HEADER_SIZE; memset(bitmap_, 0, page_bitmap_size(page_header_->record_capacity)); // column_index[i] store the end offset of column `i` or the start offset of column `i+1` + + // 计算列偏移 int *column_index = reinterpret_cast(frame_->data() + page_header_->col_idx_offset); for (int i = 0; i < column_num; ++i) { ASSERT(i == table_meta->field(i)->field_id(), "i should be the col_id of fields[i]"); @@ -212,13 +217,14 @@ RC RecordPageHandler::init_empty_page( } RC RecordPageHandler::init_empty_page(DiskBufferPool &buffer_pool, LogHandler &log_handler, PageNum page_num, - int record_size, int column_num, const char *col_idx_data) + int record_size, int column_num, const char *col_idx_data, LobFileHandler *lob_handler) { RC rc = init(buffer_pool, log_handler, page_num, ReadWriteMode::READ_WRITE); if (OB_FAIL(rc)) { LOG_ERROR("Failed to init empty page page_num:record_size %d:%d. rc=%s", page_num, record_size, strrc(rc)); return rc; } + lob_handler_ = lob_handler; (void)log_handler_.init(log_handler, buffer_pool.id(), record_size, storage_format_); @@ -282,6 +288,7 @@ RC RowRecordPageHandler::insert_record(const char *data, RID *rid) bitmap.set_bit(index); page_header_->record_num++; + // 记录日志,与数据库恢复相关 RC rc = log_handler_.insert_record(frame_, RID(get_page_num(), index), data); if (OB_FAIL(rc)) { LOG_ERROR("Failed to insert record. page_num %d:%d. rc=%s", disk_buffer_pool_->file_desc(), frame_->page_num(), strrc(rc)); @@ -417,7 +424,17 @@ bool RecordPageHandler::is_full() const { return page_header_->record_num >= pag RC PaxRecordPageHandler::insert_record(const char *data, RID *rid) { // your code here - exit(-1); + // Todo: + // 1.参考RowRecordPageHandler::insert_record完成大体实现 + // 2.将一行数据拆分成不同的列插入到不同偏移中 + // 对应列的偏移可以参照RecordPageHandler::init_empty_page + return RC::UNIMPLEMENTED; +} + +RC PaxRecordPageHandler::insert_chunk(const Chunk &chunk, int start_row, int &insert_rows) +{ + // your code here + return RC::UNIMPLEMENTED; } RC PaxRecordPageHandler::delete_record(const RID *rid) @@ -447,14 +464,21 @@ RC PaxRecordPageHandler::delete_record(const RID *rid) RC PaxRecordPageHandler::get_record(const RID &rid, Record &record) { // your code here - exit(-1); + // Todo: + // 1.参考RowRecordPageHandler::get_record完成大体实现 + // 2.通过列的偏移拼接出完整的行数据 + // 可以参照PaxRecordPageHandler::insert_record的实现 + return RC::UNIMPLEMENTED; } // TODO: specify the column_ids that chunk needed. currenly we get all columns RC PaxRecordPageHandler::get_chunk(Chunk &chunk) { // your code here - exit(-1); + // Todo: + // 参照PaxRecordPageHandler::get_record + // 一次性获得一个page的所有record + return RC::UNIMPLEMENTED; } char *PaxRecordPageHandler::get_field_data(SlotNum slot_num, int col_id) @@ -481,7 +505,8 @@ int PaxRecordPageHandler::get_field_len(int col_id) RecordFileHandler::~RecordFileHandler() { this->close(); } -RC RecordFileHandler::init(DiskBufferPool &buffer_pool, LogHandler &log_handler, TableMeta *table_meta) +RC RecordFileHandler::init( + DiskBufferPool &buffer_pool, LogHandler &log_handler, TableMeta *table_meta, LobFileHandler *lob_handler) { if (disk_buffer_pool_ != nullptr) { LOG_ERROR("record file handler has been openned."); @@ -491,6 +516,7 @@ RC RecordFileHandler::init(DiskBufferPool &buffer_pool, LogHandler &log_handler, disk_buffer_pool_ = &buffer_pool; log_handler_ = &log_handler; table_meta_ = table_meta; + lob_handler_ = lob_handler; RC rc = init_free_pages(); @@ -581,7 +607,7 @@ RC RecordFileHandler::insert_record(const char *data, int record_size, RID *rid) current_page_num = frame->page_num(); ret = record_page_handler->init_empty_page( - *disk_buffer_pool_, *log_handler_, current_page_num, record_size, table_meta_); + *disk_buffer_pool_, *log_handler_, current_page_num, record_size, table_meta_, lob_handler_); if (OB_FAIL(ret)) { frame->unpin(); LOG_ERROR("Failed to init empty page. ret:%d", ret); @@ -605,6 +631,12 @@ RC RecordFileHandler::insert_record(const char *data, int record_size, RID *rid) return record_page_handler->insert_record(data, rid); } +RC RecordFileHandler::insert_chunk(const Chunk &chunk, int record_size) +{ + // your code here + return RC::UNIMPLEMENTED; +} + RC RecordFileHandler::recover_insert_record(const char *data, int record_size, const RID &rid) { RC ret = RC::SUCCESS; @@ -748,7 +780,7 @@ RC ChunkFileScanner::next_chunk(Chunk &chunk) while (bp_iterator_.has_next()) { PageNum page_num = bp_iterator_.next(); record_page_handler_->cleanup(); - rc = record_page_handler_->init(*disk_buffer_pool_, *log_handler_, page_num, rw_mode_); + rc = record_page_handler_->init(*disk_buffer_pool_, *log_handler_, page_num, rw_mode_, table_->lob_handler()); if (OB_FAIL(rc)) { LOG_WARN("failed to init record page handler. page_num=%d, rc=%s", page_num, strrc(rc)); return rc; diff --git a/src/observer/storage/record/record_manager.h b/src/observer/storage/record/record_manager.h index ced318832..05eddaf0b 100644 --- a/src/observer/storage/record/record_manager.h +++ b/src/observer/storage/record/record_manager.h @@ -20,6 +20,7 @@ See the Mulan PSL v2 for more details. */ #include "storage/common/chunk.h" #include "storage/record/record.h" #include "storage/record/record_log.h" +#include "storage/record/lob_handler.h" #include "common/types.h" class LogHandler; @@ -136,7 +137,7 @@ class RecordPageHandler * @param page_num 当前处理哪个页面 * @param mode 是否只读。在访问页面时,需要对页面加锁 */ - RC init(DiskBufferPool &buffer_pool, LogHandler &log_handler, PageNum page_num, ReadWriteMode mode); + RC init(DiskBufferPool &buffer_pool, LogHandler &log_handler, PageNum page_num, ReadWriteMode mode, LobFileHandler* lob_handler = nullptr); /** * @brief 数据库恢复时,与普通的运行场景有所不同,不做任何并发操作,也不需要加锁 @@ -155,7 +156,8 @@ class RecordPageHandler * @param table_meta 表的元数据 */ RC init_empty_page( - DiskBufferPool &buffer_pool, LogHandler &log_handler, PageNum page_num, int record_size, TableMeta *table_meta); + DiskBufferPool &buffer_pool, LogHandler &log_handler, PageNum page_num, int record_size, TableMeta *table_meta, + LobFileHandler* lob_handler = nullptr); /** * @brief 对一个新的页面做初始化,初始化关于该页面记录信息的页头PageHeader,该函数用于日志回放时。 @@ -166,7 +168,7 @@ class RecordPageHandler * @param col_idx_data 列索引数据 */ RC init_empty_page(DiskBufferPool &buffer_pool, LogHandler &log_handler, PageNum page_num, int record_size, - int col_num, const char *col_idx_data); + int col_num, const char *col_idx_data, LobFileHandler* lob_handler = nullptr); /** * @brief 操作结束后做的清理工作,比如释放页面、解锁 @@ -181,6 +183,8 @@ class RecordPageHandler */ virtual RC insert_record(const char *data, RID *rid) { return RC::UNIMPLEMENTED; } + virtual RC insert_chunk(const Chunk& chunk, int start_row, int& insert_rows) { return RC::UNIMPLEMENTED; } + /** * @brief 数据库恢复时,在指定位置插入数据 * @@ -262,6 +266,7 @@ class RecordPageHandler PageHeader *page_header_ = nullptr; ///< 当前页面上页面头 char *bitmap_ = nullptr; ///< 当前页面上record分配状态信息bitmap内存起始位置 StorageFormat storage_format_; + LobFileHandler* lob_handler_ = nullptr; protected: friend class RecordPageIterator; @@ -324,6 +329,9 @@ class PaxRecordPageHandler : public RecordPageHandler */ virtual RC insert_record(const char *data, RID *rid) override; + // TODO: insert chunk only used in load_data + virtual RC insert_chunk(const Chunk& chunk, int start_row, int& insert_rows) override; + virtual RC delete_record(const RID *rid) override; /** @@ -365,7 +373,7 @@ class RecordFileHandler * * @param buffer_pool 当前操作的是哪个文件 */ - RC init(DiskBufferPool &buffer_pool, LogHandler &log_handler, TableMeta *table_meta); + RC init(DiskBufferPool &buffer_pool, LogHandler &log_handler, TableMeta *table_meta, LobFileHandler* lob_handler); /** * @brief 关闭,做一些资源清理的工作 @@ -388,6 +396,8 @@ class RecordFileHandler */ RC insert_record(const char *data, int record_size, RID *rid); + RC insert_chunk(const Chunk& chunk, int record_size); + /** * @brief 数据库恢复时,在指定文件指定位置插入数据 * @@ -414,6 +424,7 @@ class RecordFileHandler common::Mutex lock_; ///< 当编译时增加-DCONCURRENCY=ON 选项时,才会真正的支持并发 StorageFormat storage_format_; TableMeta *table_meta_; + LobFileHandler* lob_handler_ = nullptr; }; /** diff --git a/src/observer/storage/table/heap_table_engine.cpp b/src/observer/storage/table/heap_table_engine.cpp index 46144ccac..6348c60e8 100644 --- a/src/observer/storage/table/heap_table_engine.cpp +++ b/src/observer/storage/table/heap_table_engine.cpp @@ -61,6 +61,19 @@ RC HeapTableEngine::insert_record(Record &record) return rc; } +RC HeapTableEngine::insert_chunk(const Chunk& chunk) +{ + RC rc = RC::SUCCESS; + rc = record_handler_->insert_chunk(chunk, table_meta_->record_size()); + if (rc != RC::SUCCESS) { + LOG_ERROR("Insert chunk failed. table name=%s, rc=%s", table_meta_->name(), strrc(rc)); + return rc; + } + + // TODO: insert chunk support update index + return rc; +} + RC HeapTableEngine::visit_record(const RID &rid, function visitor) { return record_handler_->visit_record(rid, visitor); @@ -285,7 +298,7 @@ RC HeapTableEngine::init() record_handler_ = new RecordFileHandler(table_meta_->storage_format()); - rc = record_handler_->init(*data_buffer_pool_, db_->log_handler(), table_meta_); + rc = record_handler_->init(*data_buffer_pool_, db_->log_handler(), table_meta_, table_->lob_handler_); if (rc != RC::SUCCESS) { LOG_ERROR("Failed to init record handler. rc=%s", strrc(rc)); delete record_handler_; diff --git a/src/observer/storage/table/heap_table_engine.h b/src/observer/storage/table/heap_table_engine.h index 053921fc9..9d64b1d30 100644 --- a/src/observer/storage/table/heap_table_engine.h +++ b/src/observer/storage/table/heap_table_engine.h @@ -27,6 +27,7 @@ class HeapTableEngine : public TableEngine ~HeapTableEngine() override; RC insert_record(Record &record) override; + RC insert_chunk(const Chunk& chunk) override; RC delete_record(const Record &record) override; RC insert_record_with_trx(Record &record, Trx *trx) override { return RC::UNSUPPORTED; } RC delete_record_with_trx(const Record &record, Trx *trx) override { return RC::UNSUPPORTED; } diff --git a/src/observer/storage/table/lsm_table_engine.h b/src/observer/storage/table/lsm_table_engine.h index a495ce3dd..be97cacd9 100644 --- a/src/observer/storage/table/lsm_table_engine.h +++ b/src/observer/storage/table/lsm_table_engine.h @@ -29,6 +29,7 @@ class LsmTableEngine : public TableEngine ~LsmTableEngine() override = default; RC insert_record(Record &record) override; + RC insert_chunk(const Chunk &chunk) override { return RC::UNIMPLEMENTED; } RC delete_record(const Record &record) override { return RC::UNIMPLEMENTED; } RC insert_record_with_trx(Record &record, Trx *trx) override { return RC::UNIMPLEMENTED; } RC delete_record_with_trx(const Record &record, Trx *trx) override { return RC::UNIMPLEMENTED; } diff --git a/src/observer/storage/table/table.cpp b/src/observer/storage/table/table.cpp index 312eadf8a..14093fc06 100644 --- a/src/observer/storage/table/table.cpp +++ b/src/observer/storage/table/table.cpp @@ -37,6 +37,10 @@ See the Mulan PSL v2 for more details. */ Table::~Table() { + if (lob_handler_ != nullptr) { + delete lob_handler_; + lob_handler_ = nullptr; + } } RC Table::create(Db *db, int32_t table_id, const char *path, const char *name, const char *base_dir, @@ -102,13 +106,6 @@ RC Table::create(Db *db, int32_t table_id, const char *path, const char *name, c return rc; } - // rc = init_record_handler(base_dir); - // if (rc != RC::SUCCESS) { - // LOG_ERROR("Failed to create table %s due to init record handler failed.", data_file.c_str()); - // // don't need to remove the data_file - // return rc; - // } - if (table_meta_.storage_engine() == StorageEngine::HEAP) { engine_ = make_unique(&table_meta_, db_, this); } else if (table_meta_.storage_engine() == StorageEngine::LSM) { @@ -180,6 +177,11 @@ RC Table::insert_record(Record &record) return engine_->insert_record(record); } +RC Table::insert_chunk(const Chunk& chunk) +{ + return engine_->insert_chunk(chunk); +} + RC Table::visit_record(const RID &rid, function visitor) { return engine_->visit_record(rid, visitor); diff --git a/src/observer/storage/table/table.h b/src/observer/storage/table/table.h index 5c5071366..c9d089818 100644 --- a/src/observer/storage/table/table.h +++ b/src/observer/storage/table/table.h @@ -16,6 +16,8 @@ See the Mulan PSL v2 for more details. */ #include "storage/table/table_meta.h" #include "storage/table/table_engine.h" +#include "storage/common/chunk.h" +#include "storage/record/lob_handler.h" #include "common/types.h" #include "common/lang/span.h" #include "common/lang/functional.h" @@ -82,6 +84,8 @@ class Table * @param record[in/out] 传入的数据包含具体的数据,插入成功会通过此字段返回RID */ RC insert_record(Record &record); + + RC insert_chunk(const Chunk& chunk); RC delete_record(const Record &record); RC insert_record_with_trx(Record &record, Trx *trx); @@ -113,6 +117,8 @@ class Table const TableMeta &table_meta() const; + LobFileHandler* lob_handler() const { return lob_handler_; } + RC sync(); private: @@ -132,4 +138,5 @@ class Table // RecordFileHandler *record_handler_ = nullptr; /// 记录操作 // vector indexes_; unique_ptr engine_ = nullptr; + LobFileHandler* lob_handler_ = nullptr; }; diff --git a/src/observer/storage/table/table_engine.h b/src/observer/storage/table/table_engine.h index aabd0222a..2e0a0d46e 100644 --- a/src/observer/storage/table/table_engine.h +++ b/src/observer/storage/table/table_engine.h @@ -13,6 +13,7 @@ See the Mulan PSL v2 for more details. */ #include "common/types.h" #include "common/lang/functional.h" #include "storage/table/table_meta.h" +#include "storage/common/chunk.h" struct RID; class Record; @@ -38,6 +39,7 @@ class TableEngine virtual ~TableEngine() = default; virtual RC insert_record(Record &record) = 0; + virtual RC insert_chunk(const Chunk& chunk) = 0; virtual RC delete_record(const Record &record) = 0; virtual RC insert_record_with_trx(Record &record, Trx *trx) = 0; virtual RC delete_record_with_trx(const Record &record, Trx *trx) = 0; diff --git a/test/case/test/vectorized-order-by-limit.test b/test/case/test/vectorized-order-by-limit.test new file mode 100644 index 000000000..e1c7a9612 --- /dev/null +++ b/test/case/test/vectorized-order-by-limit.test @@ -0,0 +1,37 @@ +-- echo 1. create table +set execution_mode='chunk_iterator'; + +create table t_order_by(id int, score float, name char) storage format=pax; +create table t_order_by_2(id int, age int) storage format=pax; + +-- echo 2. insert records +insert into t_order_by values(3, 1.0, 'a'); +insert into t_order_by values(1, 2.0, 'b'); +insert into t_order_by values(4, 3.0, 'c'); +insert into t_order_by values(3, 2.0, 'c'); +insert into t_order_by values(3, 4.0, 'c'); +insert into t_order_by values(3, 3.0, 'd'); +insert into t_order_by values(3, 2.0, 'f'); + +insert into t_order_by_2 values(1, 10); +insert into t_order_by_2 values(2, 20); +insert into t_order_by_2 values(3, 10); +insert into t_order_by_2 values(3, 20); +insert into t_order_by_2 values(3, 40); +insert into t_order_by_2 values(4, 20); + +-- sort select * from t_order_by order by id; + +-- sort select * from t_order_by order by id asc limit 1; + +-- sort select * from t_order_by order by id desc limit 10; + +-- sort select * from t_order_by order by score desc limit 100; + +-- echo 4. order by more than one fields +select * from t_order_by order by id, score; + +select * from t_order_by order by id desc, score asc limit 1; + +-- echo 5. order by associate with where condition +select * from t_order_by where id=3 order by score desc limit 100; diff --git a/unittest/observer/pax_storage_test.cpp b/unittest/observer/pax_storage_test.cpp index 1ba5de02a..0b02cefc5 100644 --- a/unittest/observer/pax_storage_test.cpp +++ b/unittest/observer/pax_storage_test.cpp @@ -37,7 +37,7 @@ using namespace common; class PaxRecordFileScannerWithParam : public testing::TestWithParam {}; -TEST_P(PaxRecordFileScannerWithParam, DISABLED_test_file_iterator) +TEST_P(PaxRecordFileScannerWithParam, test_file_iterator) { int record_insert_num = GetParam(); VacuousLogHandler log_handler; @@ -64,7 +64,7 @@ TEST_P(PaxRecordFileScannerWithParam, DISABLED_test_file_iterator) table_meta.fields_[1].field_id_ = 1; RecordFileHandler file_handler(StorageFormat::PAX_FORMAT); - rc = file_handler.init(*bp, log_handler, &table_meta); + rc = file_handler.init(*bp, log_handler, &table_meta, nullptr); ASSERT_EQ(rc, RC::SUCCESS); VacuousTrx trx; @@ -183,7 +183,7 @@ TEST_P(PaxRecordFileScannerWithParam, DISABLED_test_file_iterator) class PaxPageHandlerTestWithParam : public testing::TestWithParam {}; -TEST_P(PaxPageHandlerTestWithParam, DISABLED_PaxPageHandler) +TEST_P(PaxPageHandlerTestWithParam, PaxPageHandler) { int record_num = GetParam(); VacuousLogHandler log_handler; diff --git a/unittest/observer/record_manager_test.cpp b/unittest/observer/record_manager_test.cpp index cf1c372b2..2c46419c9 100644 --- a/unittest/observer/record_manager_test.cpp +++ b/unittest/observer/record_manager_test.cpp @@ -140,7 +140,7 @@ TEST(RecordScanner, test_record_file_iterator) ASSERT_EQ(rc, RC::SUCCESS); RecordFileHandler file_handler(StorageFormat::ROW_FORMAT); - rc = file_handler.init(*bp, log_handler, nullptr); + rc = file_handler.init(*bp, log_handler, nullptr, nullptr); ASSERT_EQ(rc, RC::SUCCESS); VacuousTrx trx; @@ -242,7 +242,7 @@ TEST(RecordManager, durability) ASSERT_NE(buffer_pool, nullptr); RecordFileHandler record_file_handler(StorageFormat::ROW_FORMAT); - ASSERT_EQ(record_file_handler.init(*buffer_pool, log_handler, nullptr), RC::SUCCESS); + ASSERT_EQ(record_file_handler.init(*buffer_pool, log_handler, nullptr, nullptr), RC::SUCCESS); const int record_size = 100; const char record_data[record_size] = "hello, world!"; @@ -351,7 +351,7 @@ TEST(RecordManager, durability) ASSERT_EQ(log_handler2.start(), RC::SUCCESS); RecordFileHandler record_file_handler2(StorageFormat::ROW_FORMAT); - ASSERT_EQ(record_file_handler2.init(*buffer_pool2, log_handler2, nullptr), RC::SUCCESS); + ASSERT_EQ(record_file_handler2.init(*buffer_pool2, log_handler2, nullptr, nullptr), RC::SUCCESS); for (const auto &[rid, record] : record_map) { Record record_data; ASSERT_EQ(record_file_handler2.get_record(rid, record_data), RC::SUCCESS);