Skip to content

Conversation

@haohuaijin
Copy link
Collaborator

@haohuaijin haohuaijin commented Sep 22, 2025

related to #8173

after this pr, the subquery in below sql can use index optimzier to speed up

select kubernetes_namespace_name,
  array_agg(distinct kubernetes_container_name) as container_name
from default
where log like '%zinc%'
and kubernetes_namespace_name in (
    select distinct kubernetes_namespace_name
    from default
    order by kubernetes_namespace_name limit 10000)
group by kubernetes_namespace_name
order by kubernetes_namespace_name
limit 10
  • add more test case
  • make the logical more clearly

@github-actions
Copy link
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 3 🔵🔵🔵⚪⚪
🧪 No relevant tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Robustness

TableNameVisitor downcasts to NewEmptyExec using unwrap without checking type safety beyond name comparison; if name collides or implementation changes, this can panic. Consider using downcast_ref check first and avoiding reliance on string name equality.

fn f_up(&mut self, node: &'n Self::Node) -> Result<TreeNodeRecursion> {
    let name = node.name();
    if name == "NewEmptyExec" {
        let table = node.as_any().downcast_ref::<NewEmptyExec>().unwrap();
        self.table_name = Some(TableReference::from(table.name()));
        Ok(TreeNodeRecursion::Continue)
    } else {
        Ok(TreeNodeRecursion::Continue)
    }
Logic Coverage

LeaderIndexOptimizer only extracts index fields for plans under SortPreservingMergeExec; ensure TopN/Distinct patterns not wrapped by SPM are still optimized or explicitly handled, otherwise optimization may be skipped.

fn f_up(&mut self, plan: Self::Node) -> Result<Transformed<Self::Node>> {
    if plan
        .as_any()
        .downcast_ref::<SortPreservingMergeExec>()
        .is_some()
    {
        // Get the index fields of the underlying table
        let mut visitor = TableNameVisitor::new();
        plan.visit(&mut visitor)?;
        let Some(table_name) = visitor.table_name else {
            return Ok(Transformed::new(plan, false, TreeNodeRecursion::Stop));
        };
        let index_fields = self
            .index_fields
            .get(&table_name)
            .cloned()
            .unwrap_or(HashSet::new());

        // check if the query is simple topn or simple distinct
        if let Some(index_optimize_mode) =
            is_simple_topn(Arc::clone(&plan), index_fields.clone())
        {
            // Check for SimpleTopN
            let mut rewriter = IndexOptimizerRewrite::new(index_optimize_mode);
            let plan = plan.rewrite(&mut rewriter)?.data;
            return Ok(Transformed::new(plan, true, TreeNodeRecursion::Stop));
        } else if let Some(index_optimize_mode) =
            is_simple_distinct(Arc::clone(&plan), index_fields.clone())
        {
            // Check for SimpleDistinct
            let mut rewriter = IndexOptimizerRewrite::new(index_optimize_mode);
            let plan = plan.rewrite(&mut rewriter)?.data;
            return Ok(Transformed::new(plan, true, TreeNodeRecursion::Stop));
        } else {
            return Ok(Transformed::new(plan, false, TreeNodeRecursion::Continue));
        }
    }
Keying Consistency

index_fields HashMap is keyed by TableReference built from visitor; ensure keys created during schema iteration (stream_name) match visitor-produced TableReference (format/normalization). Mismatch will lead to empty index_fields and missed optimizations.

// should after remote scan
let mut index_fields: HashMap<TableReference, HashSet<String>> = HashMap::new();
for (stream_name, schema) in sql.schemas.iter() {
    let stream_settings = infra::schema::unwrap_stream_settings(schema.schema());
    let idx_fields = get_stream_setting_index_fields(&stream_settings);
    let idx_fields = idx_fields
        .into_iter()
        .filter_map(|index_field| {
            if schema.contains_field(&index_field) {
                Some(index_field)
            } else {
                None
            }
        })
        .collect::<HashSet<_>>();
    index_fields.insert(stream_name.clone(), idx_fields);
}
rules.push(Arc::new(LeaderIndexOptimizerRule::new(index_fields)) as _);

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Summary

This PR extends the index optimizer functionality to support multi-stream queries (joins and subqueries) in OpenObserve. The changes transform the index optimizer from handling single-stream queries to processing multiple streams by collecting index fields from all schemas in the SQL query.

The key architectural change is in mod.rs where the single-stream constraint (if sql.stream_names.len() == 1) is removed and replaced with logic that builds a HashMap<TableReference, HashSet<String>> mapping each stream to its index fields. This allows the optimizer to maintain per-table index field information rather than using a global set.

In the physical optimizer module, the LeaderIndexOptimizerRule is updated to work with this new per-table index field structure. A new TableNameVisitor is introduced to extract table names from execution plans by traversing the plan tree and identifying NewEmptyExec nodes, which represent the actual data sources.

The optimization logic now looks up the appropriate index fields for each specific table during query execution, enabling proper index-based optimizations for complex multi-stream scenarios. This change integrates with the existing DataFusion optimizer framework and maintains compatibility with single-stream queries while extending capabilities to joins and subqueries.

Confidence score: 3/5

  • This PR has potential issues that need attention before merging safely
  • Score reflects concerns about error handling and the robustness of the table name extraction logic
  • Pay close attention to the physical optimizer module, particularly the TableNameVisitor implementation

Context used:

Context - Avoid using expect with potentially failing operations; instead, handle the None case to prevent panics. (link)

2 files reviewed, 2 comments

Edit Code Review Bot Settings | Greptile

@github-actions
Copy link
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent panic on downcast

Avoid unwrap() on the downcast to prevent a panic if the node name and type diverge.
Use conditional downcast and only set table_name when the cast succeeds.

src/service/search/datafusion/optimizer/physical_optimizer/index_optimizer/mod.rs [286-293]

-let name = node.name();
-if name == "NewEmptyExec" {
-    let table = node.as_any().downcast_ref::<NewEmptyExec>().unwrap();
-    self.table_name = Some(TableReference::from(table.name()));
-    Ok(TreeNodeRecursion::Continue)
-} else {
-    Ok(TreeNodeRecursion::Continue)
+if node.name() == "NewEmptyExec" {
+    if let Some(table) = node.as_any().downcast_ref::<NewEmptyExec>() {
+        self.table_name = Some(TableReference::from(table.name()));
+    }
 }
+Ok(TreeNodeRecursion::Continue)
Suggestion importance[1-10]: 8

__

Why: Replacing unwrap() with a conditional downcast removes a potential runtime panic if the node name and actual type diverge, improving robustness without altering behavior. This directly matches the new code hunk and is a meaningful safety fix.

Medium
Preserve optimizer traversal

Don't stop traversal when table name is absent; this can suppress other
optimizations. Continue recursion so other nodes can still be optimized.

src/service/search/datafusion/optimizer/physical_optimizer/index_optimizer/mod.rs [210-213]

 let Some(table_name) = visitor.table_name else {
-    return Ok(Transformed::new(plan, false, TreeNodeRecursion::Stop));
+    return Ok(Transformed::new(plan, false, TreeNodeRecursion::Continue));
 };
Suggestion importance[1-10]: 6

__

Why: Continuing traversal when no table name is found can allow other optimizations to apply, which is a reasonable behavior change with low risk. It aligns with the control-flow in the new hunk and could improve optimization coverage, though impact is moderate.

Low
General
Reduce unnecessary allocations

Avoid allocating a new empty set on every miss to prevent unnecessary cloning and
temporary allocations. Return a reference to an existing empty set instead and only
clone when needed.

src/service/search/datafusion/optimizer/physical_optimizer/index_optimizer/mod.rs [213-217]

+let empty: HashSet<String> = HashSet::new();
 let index_fields = self
     .index_fields
     .get(&table_name)
-    .cloned()
-    .unwrap_or(HashSet::new());
+    .unwrap_or(&empty)
+    .clone();
Suggestion importance[1-10]: 5

__

Why: The change avoids constructing a new HashSet on miss and clones only when needed, which is a minor efficiency/readability improvement. However, it introduces a local empty set whose reference must not outlive the scope; here it's cloned immediately so it's safe but only marginally beneficial.

Low

@haohuaijin haohuaijin merged commit c2b7b2c into main Sep 23, 2025
28 of 29 checks passed
@haohuaijin haohuaijin deleted the feat-index-optimizer-multi-stream branch September 23, 2025 10:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants