diff --git a/src/include/debug_utils.hpp b/src/include/debug_utils.hpp new file mode 100644 index 0000000..0d3f790 --- /dev/null +++ b/src/include/debug_utils.hpp @@ -0,0 +1,30 @@ +//===----------------------------------------------------------------------===// +// RPT Extension +// +// debug_utils.hpp +// +// Debug printing utilities - prints only in debug builds +//===----------------------------------------------------------------------===// + +#pragma once + +#include "duckdb/common/printer.hpp" +#include "duckdb/common/string_util.hpp" + +namespace duckdb { + +// debug print macro - only prints in debug builds, no-op in release +#ifdef DEBUG + +#define D_PRINT(...) Printer::Print(__VA_ARGS__) +#define D_PRINTF(...) Printer::PrintF(__VA_ARGS__) + +#else + +#define D_PRINT(...) ((void)0) +#define D_PRINTF(...) ((void)0) + +#endif + +} // namespace duckdb + diff --git a/src/operators/logical_use_bf.cpp b/src/operators/logical_use_bf.cpp index 7d3d2bc..d08b50c 100644 --- a/src/operators/logical_use_bf.cpp +++ b/src/operators/logical_use_bf.cpp @@ -1,6 +1,7 @@ #include "logical_use_bf.hpp" #include "physical_use_bf.hpp" #include "dag.hpp" +#include "debug_utils.hpp" namespace duckdb { @@ -54,23 +55,27 @@ PhysicalOperator &LogicalUseBF::CreatePlan(ClientContext &context, PhysicalPlanG // step 2: resolve/map the bf operation probe columns to chunk column indices vector resolved_indices; + +#ifdef DEBUG Printer::Print(StringUtil::Format("[RESOLVE] LogicalUseBF probe_table=%llu has %zu probe_columns", - bf_operation.probe_table_idx, bf_operation.probe_columns.size())); + (unsigned long long)bf_operation.probe_table_idx, bf_operation.probe_columns.size())); Printer::Print(StringUtil::Format("[RESOLVE] child_bindings.size()=%zu", child_bindings.size())); for (idx_t j = 0; j < child_bindings.size(); j++) { Printer::Print(StringUtil::Format(" child_bindings[%llu] = table_idx=%llu, col_idx=%llu", - j, child_bindings[j].table_index, child_bindings[j].column_index)); + (unsigned long long)j, (unsigned long long)child_bindings[j].table_index, + (unsigned long long)child_bindings[j].column_index)); } +#endif for (const ColumnBinding &column_binding: bf_operation.probe_columns) { - Printer::Print(StringUtil::Format("[RESOLVE] Looking for probe_column: table_idx=%llu, col_idx=%llu", - column_binding.table_index, column_binding.column_index)); + D_PRINTF("[RESOLVE] Looking for probe_column: table_idx=%llu, col_idx=%llu", + (unsigned long long)column_binding.table_index, (unsigned long long)column_binding.column_index); // find the position of the bf column ColumnBinding in the chunk columns for (idx_t i = 0; i < child_bindings.size(); i++) { if (child_bindings[i].table_index == column_binding.table_index && child_bindings[i].column_index == column_binding.column_index) { resolved_indices.push_back(i); - Printer::Print(StringUtil::Format("[RESOLVE] Matched at chunk position %llu", i)); + D_PRINTF("[RESOLVE] Matched at chunk position %llu", (unsigned long long)i); break; } } @@ -85,14 +90,14 @@ PhysicalOperator &LogicalUseBF::CreatePlan(ClientContext &context, PhysicalPlanG resolved_indices); physical = static_cast(&physical_op); + // set up reference to related PhysicalCreateBF if available if (related_create_bf) { - string probe_table = "table_" + std::to_string(bf_operation.probe_table_idx); - Printer::Print(StringUtil::Format("[LOGICAL USE] probe table - %s Related_create_bf exists", probe_table.c_str())); + D_PRINTF("[LOGICAL USE] probe table - table_%llu Related_create_bf exists", + (unsigned long long)bf_operation.probe_table_idx); } - // set up reference to related PhysicalCreateBF if available if (related_create_bf && related_create_bf->physical) { - string probe_table = "table_" + std::to_string(bf_operation.probe_table_idx); - Printer::Print(StringUtil::Format("[LOGICAL USE] probe table - %s Related_create_bf physical exists", probe_table.c_str())); + D_PRINTF("[LOGICAL USE] probe table - table_%llu Related_create_bf physical exists", + (unsigned long long)bf_operation.probe_table_idx); physical->related_create_bf = related_create_bf->physical; physical->related_create_bf_vec.push_back(related_create_bf->physical); } diff --git a/src/operators/physical_create_bf.cpp b/src/operators/physical_create_bf.cpp index adbc495..7fe786b 100644 --- a/src/operators/physical_create_bf.cpp +++ b/src/operators/physical_create_bf.cpp @@ -4,6 +4,7 @@ #include "duckdb/execution/expression_executor.hpp" #include "duckdb/parallel/pipeline.hpp" #include "duckdb/parallel/base_pipeline_event.hpp" +#include "debug_utils.hpp" #include #include #include @@ -150,14 +151,14 @@ class CreateBFFinalizeEvent : public BasePipelineEvent { void FinishEvent() override { // mark all bloom filters as finalized after parallel building completes string build_table = sink.op.bf_operation ? "table_" + std::to_string(sink.op.bf_operation->build_table_idx) : "unknown"; - printf("[FINALIZE] CREATE_BF (build=%s): %zu bloom filters\n", - build_table.c_str(), sink.op.bloom_filter_map.size()); + D_PRINTF("[FINALIZE] CREATE_BF (build=%s): %zu bloom filters", + build_table.c_str(), sink.op.bloom_filter_map.size()); for (auto &[col, bf] : sink.op.bloom_filter_map) { if (bf) { bf->finalized_ = true; - printf("[FINALIZE] CREATE_BF (build=%s): Bloom filter for column (%llu.%llu) marked as finalized\n", - build_table.c_str(), col.table_index, col.column_index); + D_PRINTF("[FINALIZE] CREATE_BF (build=%s): Bloom filter for column (%llu.%llu) marked as finalized", + build_table.c_str(), (unsigned long long)col.table_index, (unsigned long long)col.column_index); } } } @@ -186,10 +187,8 @@ SinkFinalizeType PhysicalCreateBF::Finalize(Pipeline &pipeline, Event &event, Cl // print total data size string build_table = bf_operation ? "table_" + std::to_string(bf_operation->build_table_idx) : "unknown"; - printf("[FINALIZE] CREATE_BF (build=%s, this=%p): total_data contains %llu rows\n", - build_table.c_str(), (void*)this, gsink.total_data->Count()); - // printf("[FINALIZE] total_data: \n"); - // gsink.total_data->Print(); + D_PRINTF("[FINALIZE] CREATE_BF (build=%s): total_data contains %llu rows", + build_table.c_str(), (unsigned long long)gsink.total_data->Count()); gsink.local_data_collections.clear(); @@ -264,9 +263,11 @@ unique_ptr PhysicalCreateBF::GetGlobalSourceState(ClientConte auto chunk_count = gsink.total_data->ChunkCount(); auto row_count = gsink.total_data->Count(); +#ifdef DEBUG string build_table = bf_operation ? "table_" + std::to_string(bf_operation->build_table_idx) : "unknown"; Printer::Print(StringUtil::Format("[SOURCE] CREATE_BF (build=%s) GetGlobalSourceState: chunk_count=%llu, row_count=%llu", - build_table.c_str(), chunk_count, row_count)); + build_table.c_str(), (unsigned long long)chunk_count, (unsigned long long)row_count)); +#endif const idx_t num_threads = TaskScheduler::GetScheduler(context).NumberOfThreads(); auto chunks_per_thread = MaxValue((chunk_count + num_threads - 1) / num_threads, 1); @@ -278,8 +279,10 @@ unique_ptr PhysicalCreateBF::GetGlobalSourceState(ClientConte auto chunk_idx_from = chunk_idx; auto chunk_idx_to = MinValue(chunk_idx_from + chunks_per_thread, chunk_count); state->chunks_todo.emplace_back(chunk_idx_from, chunk_idx_to); +#ifdef DEBUG Printer::Print(StringUtil::Format("[SOURCE] CREATE_BF (build=%s) Partition %llu: chunks [%llu, %llu)", - build_table.c_str(), thread_idx, chunk_idx_from, chunk_idx_to)); + build_table.c_str(), (unsigned long long)thread_idx, (unsigned long long)chunk_idx_from, (unsigned long long)chunk_idx_to)); +#endif chunk_idx = chunk_idx_to; } return unique_ptr_cast(std::move(state)); @@ -297,49 +300,54 @@ SourceResultType PhysicalCreateBF::GetData(ExecutionContext &context, DataChunk auto &lstate = input.local_state.Cast(); auto &state = input.global_state.Cast(); +#ifdef DEBUG string build_table = bf_operation ? "table_" + std::to_string(bf_operation->build_table_idx) : "unknown"; - // Printer::Print(StringUtil::Format("[SOURCE] CREATE_BF (build=%s) GetData start: partition_id=%llu, chunks_todo.size()=%zu, local_current_chunk_id=%zu\n", - // build_table.c_str(), lstate.local_partition_id, state.chunks_todo.size(), lstate.local_current_chunk_id)); +#endif + if(lstate.initial) { - lstate.local_partition_id = state.partition_id++; + lstate.local_partition_id = state.partition_id.fetch_add(1); lstate.initial = false; +#ifdef DEBUG Printer::Print(StringUtil::Format("[SOURCE] CREATE_BF (build=%s) GetData initial: partition_id=%llu, chunks_todo.size()=%zu", - build_table.c_str(), lstate.local_partition_id, state.chunks_todo.size())); + build_table.c_str(), (unsigned long long)lstate.local_partition_id, state.chunks_todo.size())); +#endif if (lstate.local_partition_id >= state.chunks_todo.size()) { - Printer::Print(StringUtil::Format("[SOURCE] CREATE_BF (build=%s) No more partitions, returning FINISHED", build_table.c_str())); + D_PRINTF("[SOURCE] CREATE_BF No more partitions, returning FINISHED"); return SourceResultType::FINISHED; } lstate.chunk_from = state.chunks_todo[lstate.local_partition_id].first; lstate.chunk_to = state.chunks_todo[lstate.local_partition_id].second; - Printer::Print(StringUtil::Format("[SOURCE] CREATE_BF (build=%s) Assigned range: [%llu, %llu)", - build_table.c_str(), lstate.chunk_from, lstate.chunk_to)); - } - - auto chunk_count = gstate.total_data->ChunkCount(); + // parallel source + lstate.local_current_chunk_id = lstate.chunk_from; - if (lstate.local_current_chunk_id >= chunk_count) { - // Printer::Print(StringUtil::Format("[SOURCE] CREATE_BF (build=%s) ERROR: trying to fetch chunk_id=%llu but chunk_count=%llu", - // build_table.c_str(), lstate.local_current_chunk_id, chunk_count)); - // throw InternalException("CREATE_BF GetData: chunk_id out of bounds"); - return SourceResultType::FINISHED; +#ifdef DEBUG + Printer::Print(StringUtil::Format("[SOURCE] CREATE_BF (build=%s) Assigned range: [%llu, %llu)", + build_table.c_str(), (unsigned long long)lstate.chunk_from, (unsigned long long)lstate.chunk_to)); +#endif } - if (lstate.local_current_chunk_id == 0) { - lstate.local_current_chunk_id = lstate.chunk_from; - // } else if(lstate.local_current_chunk_id >= lstate.chunk_to) { - // Printer::Print(StringUtil::Format("[SOURCE] CREATE_BF (build=%s) Partition exhausted (chunk_id=%llu >= chunk_to=%llu), returning FINISHED", - // build_table.c_str(), lstate.local_current_chunk_id, lstate.chunk_to)); - // // lstate.local_current_chunk_id++; - // return SourceResultType::HAVE_MORE_OUTPUT; + // sequential source + // auto chunk_count = gstate.total_data->ChunkCount(); + // + // if (lstate.local_current_chunk_id >= chunk_count) { + // return SourceResultType::FINISHED; + // } + // + // if (lstate.local_current_chunk_id == 0) { + // lstate.local_current_chunk_id = lstate.chunk_from; + // } + + // parallel source + { + // auto chunk_count = gstate.total_data->ChunkCount(); + + if (lstate.local_current_chunk_id >= lstate.chunk_to) { + return SourceResultType::FINISHED; + } } - // else if () - - // Printer::Print(StringUtil::Format("[SOURCE] CREATE_BF (build=%s) Fetching chunk %llu (total chunks=%llu)", - // build_table.c_str(), lstate.local_current_chunk_id, chunk_count)); - gstate.total_data->FetchChunk(lstate.local_current_chunk_id++, chunk); return SourceResultType::HAVE_MORE_OUTPUT; } @@ -348,10 +356,9 @@ void PhysicalCreateBF::BuildPipelines(Pipeline ¤t, MetaPipeline &meta_pipe op_state.reset(); sink_state.reset(); +#ifdef DEBUG string build_table = bf_operation ? "table_" + std::to_string(bf_operation->build_table_idx) : "unknown"; - char ptr_str[32]; - snprintf(ptr_str, sizeof(ptr_str), "%p", (void*)this); - // Printer::Print(StringUtil::Format("[PIPELINE] CREATE_BF (build=%s, this=%s) BuildPipelines called", build_table.c_str(), ptr_str)); +#endif auto &state = meta_pipeline.GetState(); @@ -359,14 +366,14 @@ void PhysicalCreateBF::BuildPipelines(Pipeline ¤t, MetaPipeline &meta_pipe state.SetPipelineSource(current, *this); if (this_pipeline == nullptr) { - Printer::Print(StringUtil::Format("[PIPELINE] CREATE_BF (build=%s) creating NEW child pipeline for build-side", build_table.c_str())); + D_PRINTF("[PIPELINE] CREATE_BF (build=%s) creating NEW child pipeline for build-side", build_table.c_str()); auto &child_meta_pipeline = meta_pipeline.CreateChildMetaPipeline(current, *this); this_pipeline = child_meta_pipeline.GetBasePipeline(); // CreateChildMetaPipeline() automatically registers the child pipeline as a dependency child_meta_pipeline.Build(children[0].get()); - Printer::Print(StringUtil::Format("[PIPELINE] CREATE_BF (build=%s) child pipeline created", build_table.c_str())); + D_PRINTF("[PIPELINE] CREATE_BF (build=%s) child pipeline created", build_table.c_str()); } else { - Printer::Print(StringUtil::Format("[PIPELINE] CREATE_BF (build=%s) adding existing child pipeline as dependency", build_table.c_str())); + D_PRINTF("[PIPELINE] CREATE_BF (build=%s) adding existing child pipeline as dependency", build_table.c_str()); current.AddDependency(this_pipeline); } @@ -379,23 +386,27 @@ void PhysicalCreateBF::BuildPipelinesFromRelated(Pipeline ¤t, auto &state = meta_pipeline.GetState(); D_ASSERT(children.size() == 1); +#ifdef DEBUG string build_table = bf_operation ? "table_" + std::to_string(bf_operation->build_table_idx) : "unknown"; char ptr_str[32]; snprintf(ptr_str, sizeof(ptr_str), "%p", (void*)this); Printer::Print(StringUtil::Format("[PIPELINE] CREATE_BF (build=%s, this=%s) BuildPipelinesFromRelated - USE_BF needs this filter", build_table.c_str(), ptr_str)); +#endif if (this_pipeline == nullptr) { - Printer::Print(StringUtil::Format("[PIPELINE] CREATE_BF (build=%s, this=%s) creating NEW child pipeline from BuildPipelinesFromRelated", build_table.c_str(), ptr_str)); + D_PRINTF("[PIPELINE] CREATE_BF creating NEW child pipeline from BuildPipelinesFromRelated"); auto &child_meta_pipeline = meta_pipeline.CreateChildMetaPipeline(current, *this); this_pipeline = child_meta_pipeline.GetBasePipeline(); child_meta_pipeline.Build(children[0].get()); - Printer::Print(StringUtil::Format("[PIPELINE] CREATE_BF (build=%s, this=%s) child pipeline created and dependency added automatically", build_table.c_str(), ptr_str)); + D_PRINT("[PIPELINE] CREATE_BF child pipeline created and dependency added automatically"); } else { - Printer::Print(StringUtil::Format("[PIPELINE] CREATE_BF (build=%s, this=%s) adding existing pipeline as dependency", build_table.c_str(), ptr_str)); + D_PRINT("[PIPELINE] CREATE_BF adding existing pipeline as dependency"); current.AddDependency(this_pipeline); } +#ifdef DEBUG this_pipeline->Print(); +#endif } } // namespace duckdb \ No newline at end of file diff --git a/src/operators/physical_create_bf.hpp b/src/operators/physical_create_bf.hpp index 48522d2..595c80c 100644 --- a/src/operators/physical_create_bf.hpp +++ b/src/operators/physical_create_bf.hpp @@ -36,6 +36,9 @@ class PhysicalCreateBF : public PhysicalOperator { return true; } + bool ParallelSink() const override { + return true; + } // source interface unique_ptr GetGlobalSourceState(ClientContext &context) const override; unique_ptr GetLocalSourceState(ExecutionContext &context, @@ -47,6 +50,10 @@ class PhysicalCreateBF : public PhysicalOperator { return true; } + bool ParallelSource() const override { + return true; + } + void BuildPipelines(Pipeline ¤t, MetaPipeline &meta_pipeline) override; void BuildPipelinesFromRelated(Pipeline ¤t, MetaPipeline &meta_pipeline); diff --git a/src/operators/physical_use_bf.cpp b/src/operators/physical_use_bf.cpp index 7999f84..a28a2e3 100644 --- a/src/operators/physical_use_bf.cpp +++ b/src/operators/physical_use_bf.cpp @@ -4,6 +4,7 @@ #include "duckdb/common/types/selection_vector.hpp" #include "duckdb/common/vector_operations/vector_operations.hpp" #include "duckdb/parallel/meta_pipeline.hpp" +#include "debug_utils.hpp" namespace duckdb { @@ -38,10 +39,10 @@ OperatorResultType PhysicalUseBF::ExecuteInternal(ExecutionContext &context, Dat // lazy initialization of bloom filters on first call if (!bf_state.bloom_filters_initialized) { - printf("[EXEC_INTERNAL] USE_BF (probe=%s, this=%p) Initializing bloom filters, bound_column_indices.size()=%zu\n", - table_name.c_str(), (void*)this, bound_column_indices.size()); + D_PRINTF("[EXEC_INTERNAL] USE_BF (probe=%s) Initializing bloom filters, bound_column_indices.size()=%zu", + table_name.c_str(), bound_column_indices.size()); for (size_t i = 0; i < bound_column_indices.size(); i++) { - printf(" bound_column_indices[%zu] = %llu\n", i, bound_column_indices[i]); + D_PRINTF(" bound_column_indices[%zu] = %llu", i, (unsigned long long)bound_column_indices[i]); } if (!related_create_bf_vec.empty() && bf_operation) { @@ -52,15 +53,15 @@ OperatorResultType PhysicalUseBF::ExecuteInternal(ExecutionContext &context, Dat if (bf) { string build_table = create_bf->bf_operation ? "table_" + std::to_string(create_bf->bf_operation->build_table_idx) : "unknown"; - printf("[EXEC_INTERNAL] USE_BF found bloom filter for col(%llu,%llu) from CREATE_BF (build=%s)\n", - build_col.table_index, build_col.column_index, build_table.c_str()); + D_PRINTF("[EXEC_INTERNAL] USE_BF found bloom filter for col(%llu,%llu) from CREATE_BF (build=%s)", + (unsigned long long)build_col.table_index, (unsigned long long)build_col.column_index, build_table.c_str()); bf_state.bloom_filters.push_back(bf); break; // found the filter for this column } } } } - printf("[EXEC_INTERNAL] USE_BF total bloom_filters.size() = %zu\n", bf_state.bloom_filters.size()); + D_PRINTF("[EXEC_INTERNAL] USE_BF total bloom_filters.size() = %zu", bf_state.bloom_filters.size()); bf_state.bloom_filters_initialized = true; } @@ -68,7 +69,8 @@ OperatorResultType PhysicalUseBF::ExecuteInternal(ExecutionContext &context, Dat // if no bloom filters or no input, just pass through if (bf_state.bloom_filters.empty() || row_num == 0) { - printf("[EXEC_INTERNAL] USE_BF (probe=%s, this=%p) No bloom filter input/empty, row_num = %llu\n", table_name.c_str(), (void*)this, row_num); + D_PRINTF("[EXEC_INTERNAL] USE_BF (probe=%s) No bloom filter input/empty, row_num = %llu", + table_name.c_str(), (unsigned long long)row_num); chunk.Reference(input); return OperatorResultType::NEED_MORE_INPUT; } @@ -80,14 +82,14 @@ OperatorResultType PhysicalUseBF::ExecuteInternal(ExecutionContext &context, Dat for (int i = 0; i < bf_state.bloom_filters.size(); i++) { auto bf = bf_state.bloom_filters[i]; if (!bf || !bf->finalized_) { - printf("skipppppped"); + D_PRINT("skipped - bloom filter not ready"); continue; } // check if bloom filter is empty (no data inserted) if (bf->num_sectors == 0 || bf->blocks == nullptr) { string build_table = bf_operation ? "table_" + std::to_string(bf_operation->build_table_idx) : "unknown"; - printf("Bloom filter empty for %s\n", build_table.c_str()); + D_PRINTF("Bloom filter empty for %s", build_table.c_str()); // empty filter means no matches possible chunk.SetCardinality(0); return OperatorResultType::NEED_MORE_INPUT; @@ -98,8 +100,9 @@ OperatorResultType PhysicalUseBF::ExecuteInternal(ExecutionContext &context, Dat // printf("bound columns for %s - %llu\n", probe_table.c_str(), bound_column_indices[i]); // } +#ifdef DEBUG if (!bf_state.tested_hardcoded && bf_operation && bf_operation->build_table_idx == 3) { - printf("\n[HARDCODED TEST IN USE_BF] Testing if 37 title IDs can be found in bloom filter from table_3...\n"); + Printer::Print("\n[HARDCODED TEST IN USE_BF] Testing if 37 title IDs can be found in bloom filter from table_3..."); vector test_ids = { 929582, 1547687, 1669098, 1688430, 1695344, 1710439, 1779162, 1739896, @@ -126,26 +129,27 @@ OperatorResultType PhysicalUseBF::ExecuteInternal(ExecutionContext &context, Dat int found = 0; int missing = 0; - printf(" Testing %zu IDs using column index 0:\n", test_ids.size()); + Printer::PrintF(" Testing %zu IDs using column index 0:", test_ids.size()); for (size_t i = 0; i < test_ids.size(); i++) { if (test_results[i] != 0) { found++; } else { missing++; - printf(" ❌ ID %d NOT FOUND (BUG!)\n", test_ids[i]); + Printer::PrintF(" ID %d NOT FOUND (BUG!)", test_ids[i]); } } - printf(" Found: %d / %zu\n", found, test_ids.size()); - printf(" Missing: %d / %zu\n", missing, test_ids.size()); + Printer::PrintF(" Found: %d / %zu", found, test_ids.size()); + Printer::PrintF(" Missing: %d / %zu", missing, test_ids.size()); if (missing > 0) { - printf(" ❌ BLOOM FILTER LOOKUP FAILED FROM USE_BF!\n"); + Printer::Print(" BLOOM FILTER LOOKUP FAILED FROM USE_BF!"); } else { - printf(" ✅ All 37 IDs found in bloom filter from USE_BF!\n"); + Printer::Print(" All 37 IDs found in bloom filter from USE_BF!"); } bf_state.tested_hardcoded = true; } +#endif // use bound column indices for vectorized lookup vector results(row_num); @@ -180,56 +184,37 @@ OperatorResultType PhysicalUseBF::ExecuteInternal(ExecutionContext &context, Dat chunk.Slice(input, sel, result_count); } - // char ptr_str[32]; - // snprintf(ptr_str, sizeof(ptr_str), "%p", (void*)this); - // Printer::Print(StringUtil::Format("[PIPELINE] CREATE_BF (build=%s, this=%s) BuildPipelines called", build_table.c_str(), ptr_str)); - string probe_table = bf_operation ? "table_" + std::to_string(bf_operation->probe_table_idx) : "unknown"; - string build_table = bf_operation ? "table_" + std::to_string(bf_operation->build_table_idx) : "unknown"; - // printf("[EXECUTE] USE_BF (probe=%s, this=%p, build=%s) Selected %llu rows \n", probe_table.c_str(), (void*)this, build_table.c_str(), result_count); - // chunk.Print(); + // string probe_table = bf_operation ? "table_" + std::to_string(bf_operation->probe_table_idx) : "unknown"; + // string build_table = bf_operation ? "table_" + std::to_string(bf_operation->build_table_idx) : "unknown"; + // D_PRINTF("[EXECUTE] USE_BF (probe=%s, build=%s) Selected %llu rows", + // probe_table.c_str(), build_table.c_str(), (unsigned long long)result_count); return OperatorResultType::NEED_MORE_INPUT; } void PhysicalUseBF::BuildPipelines(Pipeline ¤t, MetaPipeline &meta_pipeline) { op_state.reset(); +#ifdef DEBUG char ptr_str[32]; snprintf(ptr_str, sizeof(ptr_str), "%p", (void*)this); - // Printer::Print(StringUtil::Format("[PIPELINE] CREATE_BF (build=%s, this=%s) BuildPipelines called", build_table.c_str(), ptr_str)); string probe_table = bf_operation ? "table_" + std::to_string(bf_operation->probe_table_idx) : "unknown"; Printer::Print(StringUtil::Format("[PIPELINE] USE_BF (probe=%s, this=%s) BuildPipelines called", probe_table.c_str(), ptr_str)); +#endif auto &state = meta_pipeline.GetState(); state.AddPipelineOperator(current, *this); - Printer::Print(StringUtil::Format("[PIPELINE] USE_BF (probe=%s, this=%s) added to current pipeline as operator", probe_table.c_str(), ptr_str)); - // add dependencies on all related CREATE_BF operators +#ifdef DEBUG + Printer::Print(StringUtil::Format("[PIPELINE] USE_BF (probe=%s, this=%s) added to current pipeline as operator", probe_table.c_str(), ptr_str)); Printer::Print(StringUtil::Format("[PIPELINE] USE_BF (probe=%s) has %zu related CREATE_BF operators", probe_table.c_str(), related_create_bf_vec.size())); +#endif + // add dependencies on all related CREATE_BF operators for (size_t i = 0; i < related_create_bf_vec.size(); i++) { auto *create_bf = related_create_bf_vec[i]; - // string build_table = create_bf->bf_operation ? - // "table_" + std::to_string(create_bf->bf_operation->build_table_idx) : "unknown"; - // Printer::Print(StringUtil::Format("[PIPELINE] USE_BF (probe=%s) adding dependency #%zu on CREATE_BF (build=%s)", - // probe_table.c_str(), i, build_table.c_str())); create_bf->BuildPipelinesFromRelated(current, meta_pipeline); - - // Printer::Print(StringUtil::Format("[PIPELINE DEBUG] USE_BF (probe=%s) After adding dependency #%zu:", probe_table.c_str(), i)); - // current.PrintDependencies(); - } - - // Printer::Print(StringUtil::Format("[PIPELINE DEBUG] USE_BF (probe=%s) Final pipeline state:", probe_table.c_str())); - try { - // current.Print(); - // Printer::Print("Pipeline Dependencies"); - // current.PrintDependencies(); - } catch (...) { - // Printer::Print(" (Pipeline not yet fully initialized)"); } - // Printer::Print(""); - - // Printer::Print(StringUtil::Format("[PIPELINE] USE_BF (probe=%s) building child operator pipelines", probe_table.c_str())); // continue building child pipelines children[0].get().BuildPipelines(current, meta_pipeline); diff --git a/src/optimizer/rpt_optimizer.cpp b/src/optimizer/rpt_optimizer.cpp index 56b0e0c..38d26ee 100644 --- a/src/optimizer/rpt_optimizer.cpp +++ b/src/optimizer/rpt_optimizer.cpp @@ -12,9 +12,7 @@ #include "duckdb/common/unordered_map.hpp" #include "../operators/logical_create_bf.hpp" #include "../operators/logical_use_bf.hpp" - -#include <_assert.h> -#include +#include "debug_utils.hpp" namespace duckdb { // class LogicalCreateBF; @@ -27,6 +25,16 @@ vector RPTOptimizerContextState::ExtractOperators(LogicalOperator &pla // pass 1: collect the base tables and join operators ExtractOperatorsRecursive(plan, join_ops); + // debug: print summary of registered nodes + D_PRINT("\n=== REGISTERED NODES SUMMARY ==="); + for (const auto &[table_idx, table_info] : table_mgr.table_lookup) { + D_PRINTF(" table_idx=%llu (type=%d, cardinality=%llu)", + (unsigned long long)table_idx, (int)table_info.table_op->type, + (unsigned long long)table_info.estimated_cardinality); + } + D_PRINTF("Total registered nodes: %zu", table_mgr.table_lookup.size()); + D_PRINTF("Total join operators found: %zu\n", join_ops.size()); + // pass 2: create JoinEdges with table information return CreateJoinEdges(join_ops); } @@ -64,7 +72,21 @@ void RPTOptimizerContextState::ExtractOperatorsRecursive(LogicalOperator &plan, case LogicalOperatorType::LOGICAL_FILTER: { LogicalOperator *child = op->children[0].get(); if(child->type == LogicalOperatorType::LOGICAL_GET) { - table_mgr.AddTableOperator(child); + // register FILTER as node (not GET) - like reference impl + D_PRINTF("[NODE_REG] Registering FILTER (child=GET) for table_idx=%llu", + (unsigned long long)table_mgr.GetScalarTableIndex(op)); + table_mgr.AddTableOperator(op); + return; + } + else if (child->type == LogicalOperatorType::LOGICAL_COMPARISON_JOIN || + child->type == LogicalOperatorType::LOGICAL_DELIM_JOIN) { + // for IN-clause optimization (MARK join), register FILTER as node + // the table_index comes from the join's left child + D_PRINTF("[NODE_REG] Registering FILTER (child=JOIN) for table_idx=%llu", + (unsigned long long)table_mgr.GetScalarTableIndex(op)); + table_mgr.AddTableOperator(op); + // still recurse into the join to collect join edges and other tables + ExtractOperatorsRecursive(*child, join_ops); return; } @@ -112,14 +134,15 @@ void RPTOptimizerContextState::ExtractOperatorsRecursive(LogicalOperator &plan, ExtractOperatorsRecursive(*op->children[0], join_ops); return; } - case LogicalOperatorType::LOGICAL_DUMMY_SCAN: - case LogicalOperatorType::LOGICAL_EXPRESSION_GET: - case LogicalOperatorType::LOGICAL_DELIM_GET: - case LogicalOperatorType::LOGICAL_GET: - case LogicalOperatorType::LOGICAL_EMPTY_RESULT: - case LogicalOperatorType::LOGICAL_CHUNK_GET: - table_mgr.AddTableOperator(op); - return; + case LogicalOperatorType::LOGICAL_DUMMY_SCAN: + case LogicalOperatorType::LOGICAL_EXPRESSION_GET: + case LogicalOperatorType::LOGICAL_DELIM_GET: + case LogicalOperatorType::LOGICAL_GET: + case LogicalOperatorType::LOGICAL_EMPTY_RESULT: + case LogicalOperatorType::LOGICAL_CHUNK_GET: + D_PRINTF("[NODE_REG] Registering base table scan, type=%d", (int)op->type); + table_mgr.AddTableOperator(op); + return; default: for (auto &child : op->children) { ExtractOperatorsRecursive(*child, join_ops); @@ -137,8 +160,8 @@ ColumnBinding RPTOptimizerContextState::ResolveColumnBinding(const ColumnBinding size_t hash = std::hash()(current.table_index) ^ (std::hash()(current.column_index) << 1); if (visited.count(hash)) { // cycle detected, return current binding - printf("WARNING: Cycle detected in rename_col_bindings for binding (%llu.%llu)\n", - current.table_index, current.column_index); + D_PRINTF("WARNING: Cycle detected in rename_col_bindings for binding (%llu.%llu)", + (unsigned long long)current.table_index, (unsigned long long)current.column_index); break; } visited.insert(hash); @@ -194,8 +217,8 @@ vector RPTOptimizerContextState::CreateJoinEdges(vector RPTOptimizerContextState::LargestRoot(vector &edges) } if (!best_edge) { - printf("Warning - Disconnected components found. MST incomplete.\n"); + D_PRINT("Warning - Disconnected components found. MST incomplete."); break; } @@ -272,7 +295,7 @@ TreeNode* RPTOptimizerContextState::BuildRootedTree(vector &mst_edges) } if (table_mgr.table_ops.empty()) { - printf("ERROR: BuildRootedTree called with empty table_ops\n"); + D_PRINT("ERROR: BuildRootedTree called with empty table_ops"); return nullptr; } @@ -292,7 +315,7 @@ TreeNode* RPTOptimizerContextState::BuildRootedTree(vector &mst_edges) } if (!found_root) { - printf("ERROR: No valid root table found\n"); + D_PRINT("ERROR: No valid root table found"); return nullptr; } @@ -305,7 +328,7 @@ TreeNode* RPTOptimizerContextState::BuildRootedTree(vector &mst_edges) // verify root node was created if (table_to_node.find(root_table_idx) == table_to_node.end() || !table_to_node[root_table_idx]) { - printf("ERROR: Failed to create root node for table %llu\n", root_table_idx); + D_PRINTF("ERROR: Failed to create root node for table %llu", (unsigned long long)root_table_idx); // cleanup allocated nodes for (auto &pair : table_to_node) { delete pair.second; @@ -334,7 +357,7 @@ TreeNode* RPTOptimizerContextState::BuildRootedTree(vector &mst_edges) // check if current node exists if (table_to_node.find(current) == table_to_node.end() || !table_to_node[current]) { - printf("ERROR: Node for table %llu not found in table_to_node\n", current); + D_PRINTF("ERROR: Node for table %llu not found in table_to_node", (unsigned long long)current); continue; } @@ -345,7 +368,7 @@ TreeNode* RPTOptimizerContextState::BuildRootedTree(vector &mst_edges) if (visited.count(neighbor_idx) == 0) { // verify neighbor node exists if (table_to_node.find(neighbor_idx) == table_to_node.end() || !table_to_node[neighbor_idx]) { - printf("ERROR: Child node for table %llu not found\n", neighbor_idx); + D_PRINTF("ERROR: Child node for table %llu not found", (unsigned long long)neighbor_idx); continue; } @@ -366,11 +389,14 @@ TreeNode* RPTOptimizerContextState::BuildRootedTree(vector &mst_edges) return table_to_node[root_table_idx]; } -void RPTOptimizerContextState::DebugPrintGraph(const vector &edges) const { +void RPTOptimizerContextState::DebugPrintGraph([[maybe_unused]] const vector &edges) const { +#ifdef DEBUG // Debug: Print all tables - printf("=== TABLE INFORMATION ===\n"); + Printer::Print("=== TABLE INFORMATION ==="); for (const auto &table_info : table_mgr.table_ops) { - printf("Table %llu: cardinality=%llu\n", table_info.table_idx, table_info.estimated_cardinality); + Printer::PrintF("Table %llu: cardinality=%llu", + (unsigned long long)table_info.table_idx, + (unsigned long long)table_info.estimated_cardinality); } // Find largest table @@ -382,66 +408,77 @@ void RPTOptimizerContextState::DebugPrintGraph(const vector &edges) co largest_table_idx = table_info.table_idx; } } - printf("Largest table: %llu (cardinality=%llu)\n\n", largest_table_idx, max_cardinality); + Printer::PrintF("Largest table: %llu (cardinality=%llu)\n", + (unsigned long long)largest_table_idx, (unsigned long long)max_cardinality); // Debug: Print all join edges - printf("=== ALL JOIN EDGES ===\n"); + Printer::Print("=== ALL JOIN EDGES ==="); for (size_t i = 0; i < edges.size(); i++) { const auto &edge = edges[i]; - printf("Edge %zu: %llu <-> %llu (weight=%llu, type=%d)\n", - i, edge.table_a, edge.table_b, edge.weight, (int)edge.join_type); + Printer::PrintF("Edge %zu: %llu <-> %llu (weight=%llu, type=%d)", + i, (unsigned long long)edge.table_a, (unsigned long long)edge.table_b, + (unsigned long long)edge.weight, (int)edge.join_type); // Print column bindings - printf(" Columns A: "); + string cols_a = " Columns A: "; for (const auto &col : edge.join_columns_a) { - printf("(%llu.%llu) ", col.table_index, col.column_index); + cols_a += "(" + std::to_string(col.table_index) + "." + std::to_string(col.column_index) + ") "; } - printf("\n Columns B: "); + Printer::Print(cols_a); + + string cols_b = " Columns B: "; for (const auto &col : edge.join_columns_b) { - printf("(%llu.%llu) ", col.table_index, col.column_index); + cols_b += "(" + std::to_string(col.table_index) + "." + std::to_string(col.column_index) + ") "; } - printf("\n"); + Printer::Print(cols_b); } - printf("\n"); + Printer::Print(""); +#endif } -void RPTOptimizerContextState::DebugPrintMST(const vector &mst_edges, const vector &bf_operations) { - printf("=== MST EDGES ===\n"); +void RPTOptimizerContextState::DebugPrintMST([[maybe_unused]] const vector &mst_edges, + [[maybe_unused]] const vector &bf_operations) { +#ifdef DEBUG + Printer::Print("=== MST EDGES ==="); for (size_t i = 0; i < mst_edges.size(); i++) { const auto &edge = mst_edges[i]; - printf("MST Edge %zu: %llu <-> %llu (weight=%llu)\n", - i, edge.table_a, edge.table_b, edge.weight); + Printer::PrintF("MST Edge %zu: %llu <-> %llu (weight=%llu)", + i, (unsigned long long)edge.table_a, (unsigned long long)edge.table_b, + (unsigned long long)edge.weight); } - printf("\n"); + Printer::Print(""); - printf("=== BLOOM FILTER OPERATIONS ===\n"); + Printer::Print("=== BLOOM FILTER OPERATIONS ==="); for (size_t i = 0; i < bf_operations.size(); i++) { const auto &bf_op = bf_operations[i]; if (bf_op.is_create) { // CREATE operation - printf("BF Op %zu: CREATE_BF on table %llu\n", i, bf_op.build_table_idx); - printf(" Build columns: "); + Printer::PrintF("BF Op %zu: CREATE_BF on table %llu", i, (unsigned long long)bf_op.build_table_idx); + string cols = " Build columns: "; for (const auto &col : bf_op.build_columns) { - printf("(%llu.%llu) ", col.table_index, col.column_index); + cols += "(" + std::to_string(col.table_index) + "." + std::to_string(col.column_index) + ") "; } - printf("\n"); + Printer::Print(cols); } else { // USE operation - printf("BF Op %zu: USE_BF on table %llu (using BF from table %llu)\n", - i, bf_op.probe_table_idx, bf_op.build_table_idx); - printf(" Build columns: "); + Printer::PrintF("BF Op %zu: USE_BF on table %llu (using BF from table %llu)", + i, (unsigned long long)bf_op.probe_table_idx, (unsigned long long)bf_op.build_table_idx); + string build_cols = " Build columns: "; for (const auto &col : bf_op.build_columns) { - printf("(%llu.%llu) ", col.table_index, col.column_index); + build_cols += "(" + std::to_string(col.table_index) + "." + std::to_string(col.column_index) + ") "; } - printf("\n Probe columns: "); + Printer::Print(build_cols); + + string probe_cols = " Probe columns: "; for (const auto &col : bf_op.probe_columns) { - printf("(%llu.%llu) ", col.table_index, col.column_index); + probe_cols += "(" + std::to_string(col.table_index) + "." + std::to_string(col.column_index) + ") "; } - printf("\n"); + Printer::Print(probe_cols); } } - printf("\n"); + Printer::Print(""); +#endif } std::pair>, @@ -453,7 +490,7 @@ RPTOptimizerContextState::GenerateStageModifications(const vector &mst // check if tree building failed if (!root) { - printf("ERROR: BuildRootedTree returned nullptr, returning empty modifications\n"); + D_PRINT("ERROR: BuildRootedTree returned nullptr, returning empty modifications"); return {{}, {}}; } @@ -469,7 +506,7 @@ RPTOptimizerContextState::GenerateStageModifications(const vector &mst while (front < queue.size()) { TreeNode* node = queue[front++]; if (!node) { - printf("ERROR: Null node encountered during BFS\n"); + D_PRINT("ERROR: Null node encountered during BFS"); continue; } @@ -480,7 +517,7 @@ RPTOptimizerContextState::GenerateStageModifications(const vector &mst if (child) { queue.push_back(child); } else { - printf("ERROR: Null child node encountered\n"); + D_PRINT("ERROR: Null child node encountered"); } } } @@ -496,19 +533,20 @@ RPTOptimizerContextState::GenerateStageModifications(const vector &mst for (int level = max_level; level >= 1; level--) { for (TreeNode* child_node : nodes_by_level[level]) { if (!child_node) { - printf("ERROR: Null child_node at level %d\n", level); + D_PRINTF("ERROR: Null child_node at level %d", level); continue; } TreeNode* parent_node = child_node->parent; if (!parent_node) { - printf("ERROR: Null parent_node for table %llu at level %d\n", child_node->table_idx, level); + D_PRINTF("ERROR: Null parent_node for table %llu at level %d", + (unsigned long long)child_node->table_idx, level); continue; } JoinEdge* edge = child_node->edge_to_parent; if (!edge) { - printf("ERROR: Null edge_to_parent for table %llu\n", child_node->table_idx); + D_PRINTF("ERROR: Null edge_to_parent for table %llu", (unsigned long long)child_node->table_idx); continue; } @@ -544,26 +582,26 @@ RPTOptimizerContextState::GenerateStageModifications(const vector &mst forward_bf_ops[parent_node->table_op].push_back(use_op); } } - printf("\n"); // step 4: backward pass - top-down (root to leaves) // process levels from 1 to max_level for (int level = 1; level <= max_level; level++) { for (TreeNode* child_node : nodes_by_level[level]) { if (!child_node) { - printf("ERROR: Null child_node at level %d\n", level); + D_PRINTF("ERROR: Null child_node at level %d", level); continue; } TreeNode* parent_node = child_node->parent; if (!parent_node) { - printf("ERROR: Null parent_node for table %llu at level %d\n", child_node->table_idx, level); + D_PRINTF("ERROR: Null parent_node for table %llu at level %d", + (unsigned long long)child_node->table_idx, level); continue; } JoinEdge* edge = child_node->edge_to_parent; if (!edge) { - printf("ERROR: Null edge_to_parent for table %llu\n", child_node->table_idx); + D_PRINTF("ERROR: Null edge_to_parent for table %llu", (unsigned long long)child_node->table_idx); continue; } @@ -599,7 +637,6 @@ RPTOptimizerContextState::GenerateStageModifications(const vector &mst backward_bf_ops[child_node->table_op].push_back(use_op); } } - printf("\n"); return {std::move(forward_bf_ops), std::move(backward_bf_ops)}; } @@ -805,8 +842,8 @@ void RPTOptimizerContextState::LinkUseBFToCreateBF(LogicalOperator *plan) { use_bf->related_create_bf = it->second; it->second->related_use_bf.push_back(use_bf); } else { - printf("[LINK] WARNING: No matching CREATE_BF found for USE_BF (probe=table_%llu, build=table_%llu)\n", - use_bf->bf_operation.probe_table_idx, build_table_idx); + D_PRINTF("[LINK] WARNING: No matching CREATE_BF found for USE_BF (probe=table_%llu, build=table_%llu)", + (unsigned long long)use_bf->bf_operation.probe_table_idx, (unsigned long long)build_table_idx); } } } @@ -831,7 +868,7 @@ unique_ptr RPTOptimizerContextState::Optimize(unique_ptr edges = ExtractOperators(*plan); - printf("Edges size: %lu\n", edges.size()); + D_PRINTF("Edges size: %zu", edges.size()); if (edges.size() <= 1) { return plan; } diff --git a/src/optimizer/table_manager.cpp b/src/optimizer/table_manager.cpp index fc3759e..d224ee9 100644 --- a/src/optimizer/table_manager.cpp +++ b/src/optimizer/table_manager.cpp @@ -1,7 +1,27 @@ #include "table_manager.hpp" +#include "duckdb/planner/operator/logical_get.hpp" +#include "duckdb/planner/operator/logical_comparison_join.hpp" +#include "debug_utils.hpp" namespace duckdb { +// helper to get operator type name for debug +static const char* GetOpTypeName(LogicalOperatorType type) { + switch (type) { + case LogicalOperatorType::LOGICAL_GET: return "GET"; + case LogicalOperatorType::LOGICAL_FILTER: return "FILTER"; + case LogicalOperatorType::LOGICAL_PROJECTION: return "PROJECTION"; + case LogicalOperatorType::LOGICAL_COMPARISON_JOIN: return "COMPARISON_JOIN"; + case LogicalOperatorType::LOGICAL_DELIM_JOIN: return "DELIM_JOIN"; + case LogicalOperatorType::LOGICAL_AGGREGATE_AND_GROUP_BY: return "AGGREGATE"; + case LogicalOperatorType::LOGICAL_WINDOW: return "WINDOW"; + case LogicalOperatorType::LOGICAL_UNION: return "UNION"; + case LogicalOperatorType::LOGICAL_CHUNK_GET: return "CHUNK_GET"; + case LogicalOperatorType::LOGICAL_DELIM_GET: return "DELIM_GET"; + default: return "OTHER"; + } +} + void TableManager::AddTable(const TableInfo &table) { table_lookup[table.table_idx] = table; table_ops.push_back(table); @@ -20,7 +40,25 @@ idx_t TableManager::GetScalarTableIndex(LogicalOperator *op) { return op->GetTableIndex()[0]; } case LogicalOperatorType::LOGICAL_FILTER: { - return GetScalarTableIndex(op->children[0].get()); + // handle FILTER cases like reference impl's GetTableIndexinFilter + LogicalOperator *child = op->children[0].get(); + if (child->type == LogicalOperatorType::LOGICAL_GET) { + // FILTER → GET: get table index from GET + return child->Cast().GetTableIndex()[0]; + } else if (child->type == LogicalOperatorType::LOGICAL_COMPARISON_JOIN || + child->type == LogicalOperatorType::LOGICAL_DELIM_JOIN) { + // FILTER → JOIN (e.g., MARK join for IN clause): get table index from join's left child + LogicalOperator *join_left = child->children[0].get(); + if (join_left->type == LogicalOperatorType::LOGICAL_GET) { + return join_left->Cast().GetTableIndex()[0]; + } + // recurse further if needed + return GetScalarTableIndex(join_left); + } else if (child->type == LogicalOperatorType::LOGICAL_AGGREGATE_AND_GROUP_BY) { + return child->GetTableIndex()[0]; + } + // default: recurse into child + return GetScalarTableIndex(child); } case LogicalOperatorType::LOGICAL_AGGREGATE_AND_GROUP_BY: { return op->GetTableIndex()[1]; @@ -31,15 +69,21 @@ idx_t TableManager::GetScalarTableIndex(LogicalOperator *op) { } void TableManager::AddTableOperator(LogicalOperator *op) { - // op->estimated_cardinality = op->EstimateCardinality(context); TableInfo tbl_info; tbl_info.estimated_cardinality = op->estimated_cardinality; tbl_info.table_idx = GetScalarTableIndex(op); table_id table_idx = tbl_info.table_idx; tbl_info.table_op = op; + if (table_idx != std::numeric_limits::max() && table_lookup.find(table_idx) == table_lookup.end()) { + D_PRINTF("[NODE_REG] AddTableOperator: type=%s, table_idx=%llu, cardinality=%llu", + GetOpTypeName(op->type), (unsigned long long)table_idx, + (unsigned long long)tbl_info.estimated_cardinality); table_lookup[table_idx] = tbl_info; table_ops.push_back(tbl_info); + } else if (table_idx != std::numeric_limits::max()) { + D_PRINTF("[NODE_REG] AddTableOperator SKIPPED (already exists): type=%s, table_idx=%llu", + GetOpTypeName(op->type), (unsigned long long)table_idx); } } diff --git a/src/transfer_graph_manager.cpp b/src/transfer_graph_manager.cpp index 192bfda..71f528f 100644 --- a/src/transfer_graph_manager.cpp +++ b/src/transfer_graph_manager.cpp @@ -5,6 +5,7 @@ #include "predicate_transfer_optimization.hpp" #include "duckdb/planner/expression/bound_columnref_expression.hpp" #include "duckdb/planner/operator/logical_get.hpp" +#include "debug_utils.hpp" #include @@ -48,56 +49,58 @@ static void UnionBindings(const ColumnBinding &a, const ColumnBinding &b, const } bool TransferGraphManager::Build(LogicalOperator &plan) { - printf("\n=== BUILD TRANSFER GRAPH MANAGER ===\n"); + D_PRINT("\n=== BUILD TRANSFER GRAPH MANAGER ==="); // 1. Extract all operators, including table operators and join operators const vector> joins = table_operator_manager.ExtractOperators(plan); - printf("1. extracted operators: %zu table operators, %zu join operators\n", - table_operator_manager.table_operators.size(), joins.size()); + D_PRINTF("1. extracted operators: %zu table operators, %zu join operators", + table_operator_manager.table_operators.size(), joins.size()); if (table_operator_manager.table_operators.size() < 2) { - printf("not enough table operators (< 2), skipping\n"); + D_PRINT("not enough table operators (< 2), skipping"); return false; } // print table operators - printf("table operators:\n"); + D_PRINT("table operators:"); for (auto &pair : table_operator_manager.table_operators) { auto &op = *pair.second; - printf(" table_idx=%llu, type=%s, cardinality=%llu\n", - pair.first, LogicalOperatorToString(op.type).c_str(), op.estimated_cardinality); + D_PRINTF(" table_idx=%llu, type=%s, cardinality=%llu", + (unsigned long long)pair.first, LogicalOperatorToString(op.type).c_str(), + (unsigned long long)op.estimated_cardinality); } // 2. Getting graph edges information from join operators ExtractEdgesInfo(joins); - printf("2. extracted edges: neighbor_matrix size=%zu\n", neighbor_matrix.size()); + D_PRINTF("2. extracted edges: neighbor_matrix size=%zu", neighbor_matrix.size()); if (neighbor_matrix.empty()) { - printf("no edges extracted, skipping\n"); + D_PRINT("no edges extracted, skipping"); return false; } // print edge information - printf("edge information:\n"); + D_PRINT("edge information:"); for (auto &pair : neighbor_matrix) { idx_t table1 = pair.first; for (auto &edge_pair : pair.second) { idx_t table2 = edge_pair.first; auto &edge = edge_pair.second; - printf(" edge: table_%llu <-> table_%llu, protect_left=%s, protect_right=%s\n", - table1, table2, edge->protect_left ? "true" : "false", edge->protect_right ? "true" : "false"); + D_PRINTF(" edge: table_%llu <-> table_%llu, protect_left=%s, protect_right=%s", + (unsigned long long)table1, (unsigned long long)table2, + edge->protect_left ? "true" : "false", edge->protect_right ? "true" : "false"); } } // 3. unfiltered table only receives bloom filters, they will not generate bloom filters. // SkipUnfilteredTable(joins); - printf("3. after SkipUnfilteredTable: neighbor_matrix size=%zu\n", neighbor_matrix.size()); + D_PRINTF("3. after SkipUnfilteredTable: neighbor_matrix size=%zu", neighbor_matrix.size()); // 4. create the transfer graph - printf("4. calling CreateTransferPlanUpdated()\n"); + D_PRINT("4. calling CreateTransferPlanUpdated()"); CreateTransferPlanUpdated(); - printf("=== BUILD COMPLETE ===\n\n"); + D_PRINT("=== BUILD COMPLETE ===\n"); return true; } @@ -324,16 +327,16 @@ void TransferGraphManager::ClassifyTables() { if (table->type == LogicalOperatorType::LOGICAL_GET) { auto &get = table->Cast(); if (get.table_filters.filters.empty()) { - std::cout << "table_" << id << " has no table filters\n"; - printf(" table_%llu marked as UNFILTERED (no table filters)\n", id); + D_PRINTF("table_%llu has no table filters", (unsigned long long)id); + D_PRINTF(" table_%llu marked as UNFILTERED (no table filters)", (unsigned long long)id); unfiltered_table.insert(id); continue; } else { - printf(" table_%llu has %zu table filters\n", id, get.table_filters.filters.size()); + D_PRINTF(" table_%llu has %zu table filters", (unsigned long long)id, get.table_filters.filters.size()); } } - std::cout<< "Adding table_" << id << " to filtered table\n"; + D_PRINTF("Adding table_%llu to filtered table", (unsigned long long)id); // last, it is a filtered table filtered_table.insert(id); } @@ -348,11 +351,10 @@ void TransferGraphManager::SkipUnfilteredTable(const vector &sorted_ int prior_flag = static_cast(table_operator_manager.table_operators.size()) - 1; idx_t root = std::numeric_limits::max(); - printf("Sorted nodes order - descending order: \n"); +#ifdef DEBUG + Printer::Print("Sorted nodes order - descending order:"); + string nodes_str = " "; for (auto it = sorted_nodes.rbegin(); it != sorted_nodes.rend(); ++it) { auto &node = *it; idx_t table_idx = table_operator_manager.GetScalarTableIndex(node); - std::cout << table_idx << " " ; + nodes_str += std::to_string(table_idx) + " "; } + Printer::Print(nodes_str); +#endif root = table_operator_manager.GetScalarTableIndex(sorted_nodes.back()); - std::cout << "\nRoot = " << root << std::endl; + D_PRINTF("Root = %llu", (unsigned long long)root); // Try to choose the largest filtered or intermediate table as the root // for (auto it = sorted_nodes.rbegin(); it != sorted_nodes.rend(); ++it) { @@ -554,9 +560,9 @@ void TransferGraphManager::LargestRootUpdated(vector &sorted_ root = table_operator_manager.GetScalarTableIndex(node); } - printf("LargestRootUpdated: selected root = table_%llu\n", root); - printf("filtered_table.size()=%zu, intermediate_table.size()=%zu\n", - filtered_table.size(), intermediate_table.size()); + D_PRINTF("LargestRootUpdated: selected root = table_%llu", (unsigned long long)root); + D_PRINTF("filtered_table.size()=%zu, intermediate_table.size()=%zu", + filtered_table.size(), intermediate_table.size()); // Initialize nodes for (auto &entry : table_operator_manager.table_operators) { @@ -588,8 +594,8 @@ void TransferGraphManager::LargestRootUpdated(vector &sorted_ break; } - printf(" spanning tree edge: table_%llu <-> table_%llu\n", - selected_edge.first, selected_edge.second); + D_PRINTF(" spanning tree edge: table_%llu <-> table_%llu", + (unsigned long long)selected_edge.first, (unsigned long long)selected_edge.second); auto &edge = neighbor_matrix[selected_edge.first][selected_edge.second]; selected_edges.emplace_back(std::move(edge)); @@ -661,10 +667,10 @@ void TransferGraphManager::CreateOriginTransferPlan() { } void TransferGraphManager::CreateTransferPlanUpdated() { - printf("\n=== CREATE TRANSFER PLAN UPDATED ===\n"); + D_PRINT("\n=== CREATE TRANSFER PLAN UPDATED ==="); auto saved_nodes = table_operator_manager.table_operators; - printf("calling LargestRootUpdated to build spanning tree...\n"); + D_PRINT("calling LargestRootUpdated to build spanning tree..."); while (!table_operator_manager.table_operators.empty()) { LargestRootUpdated(table_operator_manager.sorted_table_operators); @@ -672,21 +678,23 @@ void TransferGraphManager::CreateTransferPlanUpdated() { } table_operator_manager.table_operators = saved_nodes; - printf("selected_edges size: %zu\n", selected_edges.size()); - printf("transfer_order size: %zu\n", transfer_order.size()); + D_PRINTF("selected_edges size: %zu", selected_edges.size()); + D_PRINTF("transfer_order size: %zu", transfer_order.size()); - printf("transfer_order: "); +#ifdef DEBUG + string order_str = "transfer_order: "; for (auto *op : transfer_order) { auto table_idx = TableOperatorManager::GetScalarTableIndex(op); - printf("table_%llu ", table_idx); + order_str += "table_" + std::to_string(table_idx) + " "; } - printf("\n"); + Printer::Print(order_str); +#endif - printf("processing selected edges to build transfer graph...\n"); + D_PRINT("processing selected edges to build transfer graph..."); for (size_t i = 0; i < selected_edges.size(); i++) { auto &edge = selected_edges[i]; if (!edge) { - printf(" edge %zu: null, skipping\n", i); + D_PRINTF(" edge %zu: null, skipping", i); continue; } @@ -705,14 +713,15 @@ void TransferGraphManager::CreateTransferPlanUpdated() { auto protect_left = edge->protect_left; auto protect_right = edge->protect_right; - printf(" edge %zu: table_%llu (cardinality_order=%d) <-> table_%llu (cardinality_order=%d)\n", - i, left_idx, left_node->cardinality_order, right_idx, right_node->cardinality_order); - printf(" protect_left=%s, protect_right=%s\n", - protect_left ? "true" : "false", protect_right ? "true" : "false"); + D_PRINTF(" edge %zu: table_%llu (cardinality_order=%d) <-> table_%llu (cardinality_order=%d)", + i, (unsigned long long)left_idx, left_node->cardinality_order, + (unsigned long long)right_idx, right_node->cardinality_order); + D_PRINTF(" protect_left=%s, protect_right=%s", + protect_left ? "true" : "false", protect_right ? "true" : "false"); // smaller table is in the left if (left_node->cardinality_order > right_node->cardinality_order) { - printf(" swapping order: left becomes right, right becomes left\n"); + D_PRINT(" swapping order: left becomes right, right becomes left"); std::swap(left_node, right_node); std::swap(left_cols, right_cols); std::swap(protect_left, protect_right); @@ -720,18 +729,20 @@ void TransferGraphManager::CreateTransferPlanUpdated() { // forward: from the smaller to the larger if (!protect_right) { - printf(" adding FORWARD edges: table_%llu -> table_%llu\n", left_node->id, right_node->id); + D_PRINTF(" adding FORWARD edges: table_%llu -> table_%llu", + (unsigned long long)left_node->id, (unsigned long long)right_node->id); left_node->Add(right_node->id, {left_cols}, {right_cols}, {type}, true, false); right_node->Add(left_node->id, {left_cols}, {right_cols}, {type}, true, true); } else { - printf(" skipping forward edges (protect_right=true)\n"); + D_PRINT(" skipping forward edges (protect_right=true)"); } // backward: from the larger to the smaller if (!protect_left) { auto &group = table_groups[right_cols]; if (group) { - printf(" adding BACKWARD edges with GROUP LEADER: table_%llu -> group_leader_%llu\n", left_node->id, group->leader_id); + D_PRINTF(" adding BACKWARD edges with GROUP LEADER: table_%llu -> group_leader_%llu", + (unsigned long long)left_node->id, (unsigned long long)group->leader_id); auto group_leader = group->leader_id; auto &leader_cols = group->leader_column_binding; auto leader = transfer_graph[group_leader].get(); @@ -739,40 +750,50 @@ void TransferGraphManager::CreateTransferPlanUpdated() { left_node->Add(group_leader, {left_cols}, {leader_cols}, {type}, false, true); leader->Add(left_node->id, {left_cols}, {leader_cols}, {type}, false, false); } else { - printf(" adding BACKWARD edges: table_%llu -> table_%llu\n", left_node->id, right_node->id); + D_PRINTF(" adding BACKWARD edges: table_%llu -> table_%llu", + (unsigned long long)left_node->id, (unsigned long long)right_node->id); left_node->Add(right_node->id, {left_cols}, {right_cols}, {type}, false, true); right_node->Add(left_node->id, {left_cols}, {right_cols}, {type}, false, false); } } else { - printf(" skipping backward edges (protect_left=true)\n"); + D_PRINT(" skipping backward edges (protect_left=true)"); } } // print final transfer graph - printf("\nfinal transfer graph:\n"); +#ifdef DEBUG + Printer::Print("\nfinal transfer graph:"); for (auto &pair : transfer_graph) { auto &node = *pair.second; - printf(" table_%llu (cardinality_order=%d):\n", node.id, node.cardinality_order); - printf(" forward out edges: "); + Printer::PrintF(" table_%llu (cardinality_order=%d):", (unsigned long long)node.id, node.cardinality_order); + + string fwd_out = " forward out edges: "; for (auto &edge : node.forward_stage_edges.out) { - printf("->%llu ", edge->destination); + fwd_out += "->" + std::to_string(edge->destination) + " "; } - printf("\n forward in edges: "); + Printer::Print(fwd_out); + + string fwd_in = " forward in edges: "; for (auto &edge : node.forward_stage_edges.in) { - printf("<-%llu ", edge->destination); + fwd_in += "<-" + std::to_string(edge->destination) + " "; } - printf("\n backward out edges: "); + Printer::Print(fwd_in); + + string bwd_out = " backward out edges: "; for (auto &edge : node.backward_stage_edges.out) { - printf("->%llu ", edge->destination); + bwd_out += "->" + std::to_string(edge->destination) + " "; } - printf("\n backward in edges: "); + Printer::Print(bwd_out); + + string bwd_in = " backward in edges: "; for (auto &edge : node.backward_stage_edges.in) { - printf("<-%llu ", edge->destination); + bwd_in += "<-" + std::to_string(edge->destination) + " "; } - printf("\n"); + Printer::Print(bwd_in); } - printf("=== END CREATE TRANSFER PLAN UPDATED ===\n\n"); + Printer::Print("=== END CREATE TRANSFER PLAN UPDATED ===\n"); +#endif } pair TransferGraphManager::FindEdge(const unordered_set &constructed_set,