diff --git a/CMakeLists.txt b/CMakeLists.txt index 1e89b54c0..83be94c6a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -138,6 +138,7 @@ set(BUSTUB_THIRD_PARTY_INCLUDE_DIR ${PROJECT_SOURCE_DIR}/third_party/libpg_query/include ${PROJECT_SOURCE_DIR}/third_party/argparse/include ${PROJECT_SOURCE_DIR}/third_party/cpp_random_distributions + ${PROJECT_SOURCE_DIR}/third_party/csv2/include ) include_directories(${BUSTUB_SRC_INCLUDE_DIR} ${BUSTUB_TEST_INCLUDE_DIR} ${BUSTUB_THIRD_PARTY_INCLUDE_DIR}) diff --git a/src/binder/CMakeLists.txt b/src/binder/CMakeLists.txt index 506a5a984..490c3d47e 100644 --- a/src/binder/CMakeLists.txt +++ b/src/binder/CMakeLists.txt @@ -7,6 +7,7 @@ add_library( bind_create.cpp bind_insert.cpp bind_select.cpp + bind_copy.cpp bind_variable.cpp bound_statement.cpp fmt_impl.cpp diff --git a/src/binder/bind_copy.cpp b/src/binder/bind_copy.cpp new file mode 100644 index 000000000..36bf01ff6 --- /dev/null +++ b/src/binder/bind_copy.cpp @@ -0,0 +1,53 @@ +#include +#include +#include +#include + +#include "binder/binder.h" +#include "binder/bound_expression.h" +#include "binder/bound_order_by.h" +#include "binder/bound_table_ref.h" +#include "binder/expressions/bound_column_ref.h" +#include "binder/expressions/bound_constant.h" +#include "binder/statement/copy_statement.h" +#include "binder/statement/delete_statement.h" +#include "binder/statement/insert_statement.h" +#include "binder/statement/select_statement.h" +#include "binder/statement/update_statement.h" +#include "binder/tokens.h" +#include "common/exception.h" +#include "common/macros.h" +#include "common/util/string_util.h" +#include "nodes/parsenodes.hpp" +#include "type/value_factory.h" + +namespace bustub { + +auto Binder::BindCopy(duckdb_libpgquery::PGCopyStmt *pg_stmt) -> std::unique_ptr { + if (pg_stmt->is_from) { + return BindCopyFrom(pg_stmt); + } + return BindCopyTo(pg_stmt); +} + +auto Binder::BindCopyFrom(duckdb_libpgquery::PGCopyStmt *pg_stmt) -> std::unique_ptr { + auto table = BindBaseTableRef(pg_stmt->relation->relname, std::nullopt); + + std::vector> exprs; + if (pg_stmt->attlist != nullptr) { + for (auto col_node = pg_stmt->attlist->head; col_node != nullptr; col_node = col_node->next) { + auto target = reinterpret_cast(col_node->data.ptr_value); + if (target->name != nullptr) { + auto column = ResolveColumnRefFromBaseTableRef(*table, std::vector{std::string{target->name}}); + exprs.emplace_back(std::move(column)); + } + } + } + return std::make_unique(std::move(table), std::move(exprs), pg_stmt->filename, true); +} + +auto Binder::BindCopyTo(duckdb_libpgquery::PGCopyStmt *pg_stmt) -> std::unique_ptr { + // SelectNode: use pg_stmt->query + throw NotImplementedException("copyTo() not implemented"); +} +}; // namespace bustub \ No newline at end of file diff --git a/src/binder/statement/CMakeLists.txt b/src/binder/statement/CMakeLists.txt index ed93337cb..603c46d63 100644 --- a/src/binder/statement/CMakeLists.txt +++ b/src/binder/statement/CMakeLists.txt @@ -2,6 +2,7 @@ add_library( bustub_statement OBJECT create_statement.cpp + copy_statement.cpp delete_statement.cpp explain_statement.cpp index_statement.cpp diff --git a/src/binder/statement/copy_statement.cpp b/src/binder/statement/copy_statement.cpp new file mode 100644 index 000000000..65a95769a --- /dev/null +++ b/src/binder/statement/copy_statement.cpp @@ -0,0 +1,28 @@ +#include "binder/statement/copy_statement.h" +#include "binder/bound_statement.h" +#include "catalog/column.h" +#include "common/enums/statement_type.h" +#include "common/util/string_util.h" +#include "fmt/ranges.h" + +#include + +namespace bustub { + +CopyStatement::CopyStatement(std::unique_ptr table, + std::vector> columns, std::string file_path, bool is_from) + : BoundStatement(StatementType::COPY_STATEMENT), + table_(std::move(table)), + columns_(std::move(columns)), + file_path_(std::move(file_path)), + is_from_(is_from) { + if (StringUtil::EndsWith(file_path, ".csv")) { + SetCSVFormat(); + } +} + +auto CopyStatement::ToString() const -> std::string { + return fmt::format("BoundCopy {{ table={}, filename={} }}", table_, file_path_); +} + +}; // namespace bustub \ No newline at end of file diff --git a/src/binder/statement/delete_statement.cpp b/src/binder/statement/delete_statement.cpp index 8c0ab3340..bf0457b34 100644 --- a/src/binder/statement/delete_statement.cpp +++ b/src/binder/statement/delete_statement.cpp @@ -7,7 +7,7 @@ DeleteStatement::DeleteStatement(std::unique_ptr table, std:: : BoundStatement(StatementType::DELETE_STATEMENT), table_(std::move(table)), expr_(std::move(expr)) {} auto DeleteStatement::ToString() const -> std::string { - return fmt::format("Delete {{ table={}, expr={} }}", *table_, *expr_); + return fmt::format("BoundDelete {{ table={}, expr={} }}", *table_, *expr_); } } // namespace bustub diff --git a/src/binder/transformer.cpp b/src/binder/transformer.cpp index e15e65adf..45b457f5a 100644 --- a/src/binder/transformer.cpp +++ b/src/binder/transformer.cpp @@ -57,6 +57,8 @@ auto Binder::BindStatement(duckdb_libpgquery::PGNode *stmt) -> std::unique_ptr(stmt)); case duckdb_libpgquery::T_PGInsertStmt: return BindInsert(reinterpret_cast(stmt)); + case duckdb_libpgquery::T_PGCopyStmt: + return BindCopy(reinterpret_cast(stmt)); case duckdb_libpgquery::T_PGSelectStmt: return BindSelect(reinterpret_cast(stmt)); case duckdb_libpgquery::T_PGExplainStmt: diff --git a/src/execution/CMakeLists.txt b/src/execution/CMakeLists.txt index 313520a90..e3236f94d 100644 --- a/src/execution/CMakeLists.txt +++ b/src/execution/CMakeLists.txt @@ -2,6 +2,7 @@ add_library( bustub_execution OBJECT aggregation_executor.cpp + copy_from_executor.cpp delete_executor.cpp executor_factory.cpp filter_executor.cpp diff --git a/src/execution/copy_from_executor.cpp b/src/execution/copy_from_executor.cpp new file mode 100644 index 000000000..3466821bf --- /dev/null +++ b/src/execution/copy_from_executor.cpp @@ -0,0 +1,43 @@ + +#include "execution/executors/copy_from_executor.h" +#include +#include "csv2/parameters.hpp" + +namespace bustub { + +CopyFromExecutor::CopyFromExecutor(ExecutorContext *exec_ctx, const CopyFromPlanNode *plan) + : AbstractExecutor(exec_ctx), plan_(plan) { + table_info_ = exec_ctx_->GetCatalog()->GetTable(plan_->TableOid()); + // if (plan_->file_type_ == 1) { + // file_ = std::make_shared(plan_->file_type_, ',', true); + // } else { + // file_ = nullptr; + // } +} + +void CopyFromExecutor::Init() {} + +auto CopyFromExecutor::Next([[maybe_unused]] Tuple *tuple, RID *rid) -> bool { + if (read_finished_) { + return false; + } + using csv2::delimiter; + using csv2::first_row_is_header; + using csv2::quote_character; + csv2::Reader, quote_character<'"'>, first_row_is_header, csv2::trim_policy::trim_whitespace> csv; + ; + if (csv.mmap(plan_->file_path_)) { + // row insert to csv + for (const auto row : csv) { + // val to column + for (const auto cell : row) { + std::string val; + cell.read_value(val); + } + } + } + read_finished_ = true; + return true; +} + +} // namespace bustub \ No newline at end of file diff --git a/src/execution/executor_factory.cpp b/src/execution/executor_factory.cpp index fd42b3b3b..960702ac7 100644 --- a/src/execution/executor_factory.cpp +++ b/src/execution/executor_factory.cpp @@ -17,6 +17,7 @@ #include "execution/executors/abstract_executor.h" #include "execution/executors/aggregation_executor.h" +#include "execution/executors/copy_from_executor.h" #include "execution/executors/delete_executor.h" #include "execution/executors/filter_executor.h" #include "execution/executors/hash_join_executor.h" @@ -34,6 +35,7 @@ #include "execution/executors/topn_executor.h" #include "execution/executors/update_executor.h" #include "execution/executors/values_executor.h" +#include "execution/plans/copy_from_plan.h" #include "execution/plans/filter_plan.h" #include "execution/plans/mock_scan_plan.h" #include "execution/plans/projection_plan.h" @@ -58,6 +60,11 @@ auto ExecutorFactory::CreateExecutor(ExecutorContext *exec_ctx, const AbstractPl return std::make_unique(exec_ctx, dynamic_cast(plan.get())); } + // Create a new copyfrom executor + case PlanType::CopyFrom: { + return std::make_unique(exec_ctx, dynamic_cast(plan.get())); + } + // Create a new insert executor case PlanType::Insert: { auto insert_plan = dynamic_cast(plan.get()); diff --git a/src/include/binder/binder.h b/src/include/binder/binder.h index ad77a061f..93fa538e5 100644 --- a/src/include/binder/binder.h +++ b/src/include/binder/binder.h @@ -37,6 +37,7 @@ #include #include "binder/simplified_token.h" +#include "binder/statement/copy_statement.h" #include "binder/statement/select_statement.h" #include "binder/statement/set_show_statement.h" #include "binder/tokens.h" @@ -184,6 +185,12 @@ class Binder { auto BindInsert(duckdb_libpgquery::PGInsertStmt *pg_stmt) -> std::unique_ptr; + auto BindCopy(duckdb_libpgquery::PGCopyStmt *pg_stmt) -> std::unique_ptr; + + auto BindCopyFrom(duckdb_libpgquery::PGCopyStmt *pg_stmt) -> std::unique_ptr; + + auto BindCopyTo(duckdb_libpgquery::PGCopyStmt *pg_stmt) -> std::unique_ptr; + auto BindValuesList(duckdb_libpgquery::PGList *list) -> std::unique_ptr; auto BindLimitCount(duckdb_libpgquery::PGNode *root) -> std::unique_ptr; diff --git a/src/include/binder/statement/copy_statement.h b/src/include/binder/statement/copy_statement.h new file mode 100644 index 000000000..897eb4ce2 --- /dev/null +++ b/src/include/binder/statement/copy_statement.h @@ -0,0 +1,41 @@ +#pragma once + +#include +#include + +#include "binder/bound_expression.h" +#include "binder/bound_statement.h" +#include "binder/expressions/bound_column_ref.h" +#include "binder/statement/insert_statement.h" +#include "binder/table_ref/bound_base_table_ref.h" +#include "catalog/column.h" + +namespace bustub { + +enum class CopyFileFormat : uint8_t { + NONE = 0, + CSV = 1, + TBL = 2, +}; + +class CopyStatement : public BoundStatement { + public: + explicit CopyStatement(std::unique_ptr table, std::vector> columns, + std::string file_path, bool is_from); + + void SetCSVFormat() { format_ = CopyFileFormat::CSV; } + + std::unique_ptr table_; + + std::vector> columns_; + + std::string file_path_; + + bool is_from_; + + CopyFileFormat format_{CopyFileFormat::NONE}; + + auto ToString() const -> std::string override; +}; + +} // namespace bustub \ No newline at end of file diff --git a/src/include/common/enums/statement_type.h b/src/include/common/enums/statement_type.h index c915f85e8..27a9be054 100644 --- a/src/include/common/enums/statement_type.h +++ b/src/include/common/enums/statement_type.h @@ -26,6 +26,7 @@ enum class StatementType : uint8_t { INSERT_STATEMENT, // insert statement type UPDATE_STATEMENT, // update statement type CREATE_STATEMENT, // create statement type + COPY_STATEMENT, // copy statement type DELETE_STATEMENT, // delete statement type EXPLAIN_STATEMENT, // explain statement type DROP_STATEMENT, // drop statement type @@ -51,6 +52,9 @@ struct fmt::formatter : formatter { case bustub::StatementType::INSERT_STATEMENT: name = "Insert"; break; + case bustub::StatementType::COPY_STATEMENT: + name = "Copy"; + break; case bustub::StatementType::UPDATE_STATEMENT: name = "Update"; break; diff --git a/src/include/execution/executors/copy_from_executor.h b/src/include/execution/executors/copy_from_executor.h new file mode 100644 index 000000000..1a219b762 --- /dev/null +++ b/src/include/execution/executors/copy_from_executor.h @@ -0,0 +1,54 @@ +#pragma once + +#include +#include +#include + +#include "execution/executor_context.h" +#include "execution/executors/abstract_executor.h" +#include "execution/plans/copy_from_plan.h" +#include "storage/table/tuple.h" + +namespace bustub { + +struct BaseFileFormat { + uint8_t file_type_; + explicit BaseFileFormat(uint8_t file_type) : file_type_(file_type) {} + virtual ~BaseFileFormat() = default; +}; + +struct CSVFileFormat : BaseFileFormat { + uint8_t delimiter_ = ' '; + + bool has_header_; + + CSVFileFormat(uint8_t file_type, uint8_t delimiter, bool has_header) + : BaseFileFormat(file_type), delimiter_(delimiter), has_header_(has_header) {} + + ~CSVFileFormat() override = default; +}; + +/** + * CopyFromExecutor executes a copy on a table from file. + */ +class CopyFromExecutor : public AbstractExecutor { + public: + CopyFromExecutor(ExecutorContext *exec_ctx, const CopyFromPlanNode *plan); + + /** Initialize the copy */ + void Init() override; + + auto Next([[maybe_unused]] Tuple *tuple, RID *rid) -> bool override; + + auto GetOutputSchema() const -> const Schema & override { return plan_->OutputSchema(); }; + + private: + const CopyFromPlanNode *plan_; + + const bustub::TableInfo *table_info_; + + bool read_finished_{false}; + // std::shared_ptr file_; +}; + +} // namespace bustub \ No newline at end of file diff --git a/src/include/execution/plans/abstract_plan.h b/src/include/execution/plans/abstract_plan.h index 312e56ed6..41cced30b 100644 --- a/src/include/execution/plans/abstract_plan.h +++ b/src/include/execution/plans/abstract_plan.h @@ -48,7 +48,8 @@ enum class PlanType { Sort, TopN, MockScan, - InitCheck + InitCheck, + CopyFrom, }; class AbstractPlanNode; diff --git a/src/include/execution/plans/copy_from_plan.h b/src/include/execution/plans/copy_from_plan.h new file mode 100644 index 000000000..fdd1f6540 --- /dev/null +++ b/src/include/execution/plans/copy_from_plan.h @@ -0,0 +1,44 @@ +#pragma once + +#include +#include + +#include "catalog/catalog.h" +#include "execution/expressions/abstract_expression.h" +#include "execution/plans/abstract_plan.h" +#include "fmt/core.h" + +namespace bustub { + +class CopyFromPlanNode : public AbstractPlanNode { + public: + /** + * Creates a new physical copy file plan node. + * @param output the output format of this scan plan node + */ + explicit CopyFromPlanNode(SchemaRef output, table_oid_t table_oid, uint8_t file_type) + : AbstractPlanNode(std::move(output), {}), table_oid_(table_oid), file_type_(file_type) {} + + /** @return The type of the plan node */ + auto GetType() const -> PlanType override { return PlanType::CopyFrom; } + + /** @return The identifier of the table into which tuples are inserted */ + auto TableOid() const -> table_oid_t { return table_oid_; } + + BUSTUB_PLAN_NODE_CLONE_WITH_CHILDREN(CopyFromPlanNode); + + /** the relative file path */ + std::string file_path_; + + /** The table to be inserted into. */ + table_oid_t table_oid_; + + uint8_t file_type_; + + private: + auto PlanNodeToString() const -> std::string override { + return fmt::format("CopyFromFile {{ file_path={} }}", file_path_); + } +}; + +} // namespace bustub \ No newline at end of file diff --git a/src/include/planner/planner.h b/src/include/planner/planner.h index d2c9ec349..6ab21218f 100644 --- a/src/include/planner/planner.h +++ b/src/include/planner/planner.h @@ -19,6 +19,7 @@ namespace bustub { class BoundStatement; class SelectStatement; +class CopyStatement; class DeleteStatement; class AbstractPlanNode; class InsertStatement; @@ -148,6 +149,8 @@ class Planner { auto PlanUpdate(const UpdateStatement &statement) -> AbstractPlanNodeRef; + auto PlanCopy(const CopyStatement &statement) -> AbstractPlanNodeRef; + /** the root plan node of the plan tree */ AbstractPlanNodeRef plan_; diff --git a/src/planner/plan_insert.cpp b/src/planner/plan_insert.cpp index 83364d4ea..e91f4667d 100644 --- a/src/planner/plan_insert.cpp +++ b/src/planner/plan_insert.cpp @@ -3,6 +3,7 @@ #include #include "binder/bound_expression.h" +#include "binder/statement/copy_statement.h" #include "binder/statement/delete_statement.h" #include "binder/statement/insert_statement.h" #include "binder/statement/select_statement.h" @@ -14,6 +15,7 @@ #include "execution/expressions/abstract_expression.h" #include "execution/expressions/column_value_expression.h" #include "execution/plans/abstract_plan.h" +#include "execution/plans/copy_from_plan.h" #include "execution/plans/delete_plan.h" #include "execution/plans/filter_plan.h" #include "execution/plans/insert_plan.h" @@ -39,6 +41,13 @@ auto Planner::PlanInsert(const InsertStatement &statement) -> AbstractPlanNodeRe return std::make_shared(std::move(insert_schema), std::move(select), statement.table_->oid_); } +auto Planner::PlanCopy(const CopyStatement &statement) -> AbstractPlanNodeRef { + // copy from csv + std::string file_path = statement.file_path_; + auto copy_schema = std::make_shared(std::vector{Column("__bustub_internal.copy_rows", TypeId::INTEGER)}); + return std::make_shared(copy_schema, statement.table_->oid_, 1); +} + auto Planner::PlanDelete(const DeleteStatement &statement) -> AbstractPlanNodeRef { auto table = PlanTableRef(*statement.table_); auto [_, condition] = PlanExpression(*statement.expr_, {table}); diff --git a/src/planner/planner.cpp b/src/planner/planner.cpp index f287b3cb5..461453cc8 100644 --- a/src/planner/planner.cpp +++ b/src/planner/planner.cpp @@ -4,6 +4,7 @@ #include "binder/bound_expression.h" #include "binder/bound_statement.h" #include "binder/bound_table_ref.h" +#include "binder/statement/copy_statement.h" #include "binder/statement/delete_statement.h" #include "binder/statement/insert_statement.h" #include "binder/statement/select_statement.h" @@ -29,6 +30,10 @@ void Planner::PlanQuery(const BoundStatement &statement) { plan_ = PlanSelect(dynamic_cast(statement)); return; } + case StatementType::COPY_STATEMENT: { + plan_ = PlanCopy(dynamic_cast(statement)); + return; + } case StatementType::INSERT_STATEMENT: { plan_ = PlanInsert(dynamic_cast(statement)); return; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 09a63d454..541f974a5 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -52,6 +52,7 @@ set(BUSTUB_SLT_SOURCES "${PROJECT_SOURCE_DIR}/test/sql/p0.01-lower-upper.slt" "${PROJECT_SOURCE_DIR}/test/sql/p0.02-function-error.slt" "${PROJECT_SOURCE_DIR}/test/sql/p0.03-string-scan.slt" + "${PROJECT_SOURCE_DIR}/test/sql/p0.04-copy-from.slt" "${PROJECT_SOURCE_DIR}/test/sql/p3.01-seqscan.slt" "${PROJECT_SOURCE_DIR}/test/sql/p3.02-insert.slt" "${PROJECT_SOURCE_DIR}/test/sql/p3.03-update.slt" diff --git a/test/binder/binder_test.cpp b/test/binder/binder_test.cpp index 10e9bddbc..49f88e1ae 100644 --- a/test/binder/binder_test.cpp +++ b/test/binder/binder_test.cpp @@ -94,6 +94,16 @@ TEST(BinderTest, BindSelectExpr) { PrintStatements(statements); } +TEST(BinderTest, BindCopyFrom) { + auto statements = TryBind("copy y from 'test.csv'"); + PrintStatements(statements); +} + +TEST(BinderTest, DISABLED_BindCopyTo) { + auto statements = TryBind("copy a to 'b.csv'"); + PrintStatements(statements); +} + TEST(BinderTest, BindAgg) { auto statements = TryBind("select z, max(a), min(b), first(c) from y group by z having max(a) > 0"); PrintStatements(statements); diff --git a/test/csv/simple_test.csv b/test/csv/simple_test.csv new file mode 100644 index 000000000..7eb5f22cb --- /dev/null +++ b/test/csv/simple_test.csv @@ -0,0 +1,5 @@ +name, id, favorite food +quincy, 1, hot dogs +beau, 2, cereal +abbey, 3, pizza +mrugesh, 4, ice cream \ No newline at end of file diff --git a/test/sql/null.csv b/test/sql/null.csv new file mode 100644 index 000000000..e69de29bb diff --git a/test/sql/p0.04-copy-from.slt b/test/sql/p0.04-copy-from.slt new file mode 100644 index 000000000..6a26fd94d --- /dev/null +++ b/test/sql/p0.04-copy-from.slt @@ -0,0 +1,5 @@ +statement ok +create table t1(v1 int); + +statement ok +copy t1 from 'null.csv'; diff --git a/third_party/CMakeLists.txt b/third_party/CMakeLists.txt index a992ea0a1..8feee207c 100644 --- a/third_party/CMakeLists.txt +++ b/third_party/CMakeLists.txt @@ -1,6 +1,6 @@ add_subdirectory(murmur3) add_subdirectory(libpg_query) - +add_subdirectory(csv2) set(gtest_force_shared_crt ON CACHE BOOL "" FORCE) # don't override our compiler/linker options when building gtest add_subdirectory(googletest) diff --git a/third_party/csv2 b/third_party/csv2 new file mode 160000 index 000000000..7a2de1aec --- /dev/null +++ b/third_party/csv2 @@ -0,0 +1 @@ +Subproject commit 7a2de1aec083d82e43fe9539bb59b3c619c93221 diff --git a/third_party/versions.txt b/third_party/versions.txt index f2fd29525..18495f345 100644 --- a/third_party/versions.txt +++ b/third_party/versions.txt @@ -64,3 +64,8 @@ # tag: v2.7.0 # commit hash: 8ca6144c85c165987cb1c5d8395c7314e13d4cd7 # commit hash date: Dec 16 2021 + +# csv2 +# url: https://github.com/p-ranav/csv2 +# commit hash: 7a2de1aec083d82e43fe9539bb59b3c619c93221 +# commit hash date: Mar 26 2023 \ No newline at end of file