Skip to content

[Feature] Add PPL collect command#5605

Draft
noCharger wants to merge 1 commit into
opensearch-project:mainfrom
noCharger:feature/ppl-collect
Draft

[Feature] Add PPL collect command#5605
noCharger wants to merge 1 commit into
opensearch-project:mainfrom
noCharger:feature/ppl-collect

Conversation

@noCharger

Copy link
Copy Markdown
Collaborator

Terminal pass-through write: appends pipeline rows to a pre-existing index and
returns them as the result.

  • Grammar (ppl / language-grammar / async-query-core) + Collect AST + AstBuilder + anonymizer
  • visitCollect builds LogicalTableSpool; EnumerableOpenSearchTableSpool + converter rule do the eager-drain batched-bulk write (429 retry/backoff) with pass-through output
  • Options: source/host/sourcetype/marker stamps + testmode dry-run
  • Plan-time safety: destination pre-existence + dot/hidden-index refusal
  • Tests: CalcitePPLCollectTest, CollectWriteStrategyTest, NewAddedCommandsIT, CalcitePPLCollectIT
  • User doc: docs/user/ppl/cmd/collect.md

Description

#5596

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • New functionality includes testing.
  • New functionality has been documented.
  • New functionality has javadoc added.
  • New functionality has a user manual doc added.
  • New PPL command checklist all confirmed.
  • API changes companion pull request created.
  • Commits are signed per the DCO using --signoff or -s.
  • Public documentation issue/PR created.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions

github-actions Bot commented Jul 2, 2026

Copy link
Copy Markdown
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 480ccdd.

PathLineSeverityDescription
opensearch/build.gradle44highNew dependency added: 'org.opensearch.plugin:reindex-client'. Per mandatory supply chain policy, all dependency additions must be flagged regardless of apparent legitimacy. Maintainers should verify the artifact's origin, hash, and whether this version from the configured registry matches the expected OpenSearch distribution artifact.

The table above displays the top 10 most important findings.

Total: 1 | Critical: 0 | High: 1 | Medium: 0 | Low: 0


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@noCharger noCharger added the enhancement New feature or request label Jul 3, 2026
@noCharger noCharger force-pushed the feature/ppl-collect branch from 480ccdd to 9293d6b Compare July 3, 2026 10:01
@github-actions

github-actions Bot commented Jul 3, 2026

Copy link
Copy Markdown
Contributor

PR Reviewer Guide 🔍

(Review updated until commit 6098080)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Thread Safety

The PassThroughWriter class uses mutable shared state (bulk, buffered, drained, passThrough) accessed from moveNext() without synchronization. If the Enumerator is accessed concurrently (e.g., by multiple threads iterating the same Enumerable), race conditions can corrupt the bulk request or cause inconsistent writes. While Calcite's typical usage is single-threaded per query, the lack of thread-safety documentation or guards leaves the code vulnerable if the execution model changes or if the Enumerable is inadvertently shared.

private static class PassThroughWriter implements Enumerator<@Nullable Object> {

  private final OpenSearchClient client;
  private final String indexName;
  private final List<String> fields;
  private final Enumerator<@Nullable Object> source;
  private final int passThroughCap;
  private BulkRequest bulk = new BulkRequest();
  private int buffered = 0;
  private @Nullable Object current = null;
  private boolean drained = false;
  private Iterator<@Nullable Object> passThrough = null;

  PassThroughWriter(EnumerableOpenSearchTableSpool spool, Enumerator<@Nullable Object> source) {
    this.client = spool.getOsIndex().getClient();
    this.indexName = spool.getIndexName();
    this.fields = spool.getInput().getRowType().getFieldNames();
    this.passThroughCap = spool.getOsIndex().getSettings().getSettingValue(Key.QUERY_SIZE_LIMIT);
    this.source = source;
  }

  @Override
  public Object current() {
    return current;
  }

  @Override
  public boolean moveNext() {
    if (!drained) {
      drainAndWrite();
      drained = true;
    }
    if (passThrough.hasNext()) {
      current = passThrough.next();
      return true;
    }
    return false;
  }

  private void drainAndWrite() {
    List<@Nullable Object> passThroughBuffer = new ArrayList<>();
    while (source.moveNext()) {
      Object row = source.current();
      bulk.add(toIndexRequest(row));
      if (++buffered >= BATCH_SIZE) {
        flush();
      }
      if (passThroughBuffer.size() < passThroughCap) {
        passThroughBuffer.add(row);
      }
    }
    if (buffered > 0) {
      flush();
    }
    passThrough = passThroughBuffer.iterator();
  }
Unbounded Memory

The drainAndWrite method buffers up to passThroughCap rows in memory (passThroughBuffer) before iterating. If passThroughCap is large (e.g., QUERY_SIZE_LIMIT defaults to 10000), and rows are large objects, this can consume significant heap. For a source with 10500 rows and a cap of 10000, the buffer holds 10000 full row objects in memory simultaneously, which may cause OOM under load or with wide schemas.

private void drainAndWrite() {
  List<@Nullable Object> passThroughBuffer = new ArrayList<>();
  while (source.moveNext()) {
    Object row = source.current();
    bulk.add(toIndexRequest(row));
    if (++buffered >= BATCH_SIZE) {
      flush();
    }
    if (passThroughBuffer.size() < passThroughCap) {
      passThroughBuffer.add(row);
    }
  }
  if (buffered > 0) {
    flush();
  }
  passThrough = passThroughBuffer.iterator();
}
Silent Failure

The bulkWithRetry method silently returns after exhausting retries or encountering non-429 failures, leaving some rows unwritten without surfacing an error to the caller. If the final BulkResponse contains failures (e.g., mapping conflicts, version conflicts), those failures are discarded, and the user receives no indication that the write was incomplete. This can lead to silent data loss.

private static void bulkWithRetry(OpenSearchClient client, BulkRequest request) {
  java.util.Iterator<TimeValue> backoff = BackoffPolicy.exponentialBackoff().iterator();
  BulkRequest pending = request;
  while (true) {
    BulkResponse response = client.bulk(pending);
    if (!response.hasFailures()) {
      return;
    }
    BulkRequest retry = new BulkRequest();
    for (BulkItemResponse item : response.getItems()) {
      if (item.isFailed() && item.getFailure().getStatus() == RestStatus.TOO_MANY_REQUESTS) {
        retry.add(pending.requests().get(item.getItemId()));
      }
    }
    if (retry.numberOfActions() == 0 || !backoff.hasNext()) {
      return;
    }
    try {
      Thread.sleep(backoff.next().millis());
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
      return;
    }
    pending = retry;
  }
}
Interrupted State

In bulkWithRetry, when InterruptedException is caught during backoff sleep, the thread's interrupted flag is restored but the method returns immediately, abandoning the retry loop. If the caller expects the write to complete or to propagate the interruption as an exception, this silent return can cause the write to be incomplete without the caller knowing the thread was interrupted.

try {
  Thread.sleep(backoff.next().millis());
} catch (InterruptedException e) {
  Thread.currentThread().interrupt();
  return;
}

@github-actions

github-actions Bot commented Jul 3, 2026

Copy link
Copy Markdown
Contributor

PR Code Suggestions ✨

Latest suggestions up to 6098080
Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Surface non-retryable bulk write failures

The retry logic silently discards non-429 failures after exhausting retries.
Critical write failures (permission errors, mapping conflicts) should be surfaced to
the caller rather than silently ignored, as they indicate data loss.

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/EnumerableOpenSearchTableSpool.java [187-212]

 private static void bulkWithRetry(OpenSearchClient client, BulkRequest request) {
   java.util.Iterator<TimeValue> backoff = BackoffPolicy.exponentialBackoff().iterator();
   BulkRequest pending = request;
   while (true) {
     BulkResponse response = client.bulk(pending);
     if (!response.hasFailures()) {
       return;
     }
     BulkRequest retry = new BulkRequest();
+    boolean hasNonRetryableFailures = false;
     for (BulkItemResponse item : response.getItems()) {
-      if (item.isFailed() && item.getFailure().getStatus() == RestStatus.TOO_MANY_REQUESTS) {
-        retry.add(pending.requests().get(item.getItemId()));
+      if (item.isFailed()) {
+        if (item.getFailure().getStatus() == RestStatus.TOO_MANY_REQUESTS) {
+          retry.add(pending.requests().get(item.getItemId()));
+        } else {
+          hasNonRetryableFailures = true;
+        }
       }
+    }
+    if (hasNonRetryableFailures) {
+      throw new IllegalStateException("Bulk write failed with non-retryable errors: " + response.buildFailureMessage());
     }
     if (retry.numberOfActions() == 0 || !backoff.hasNext()) {
       return;
     }
     ...
   }
 }
Suggestion importance[1-10]: 9

__

Why: The current implementation silently discards non-429 failures (permission errors, mapping conflicts), which can lead to silent data loss. This is a critical issue that should be surfaced to the caller.

High
Flush pending writes on exception

If source.moveNext() throws an exception mid-stream, the partially buffered bulk
request is never flushed, causing silent data loss. Wrap the drain loop in
try-finally to ensure pending writes are flushed even on failure.

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/EnumerableOpenSearchTableSpool.java [159-175]

 private void drainAndWrite() {
   List<@Nullable Object> passThroughBuffer = new ArrayList<>();
-  while (source.moveNext()) {
-    Object row = source.current();
-    bulk.add(toIndexRequest(row));
-    if (++buffered >= BATCH_SIZE) {
+  try {
+    while (source.moveNext()) {
+      Object row = source.current();
+      bulk.add(toIndexRequest(row));
+      if (++buffered >= BATCH_SIZE) {
+        flush();
+      }
+      if (passThroughBuffer.size() < passThroughCap) {
+        passThroughBuffer.add(row);
+      }
+    }
+  } finally {
+    if (buffered > 0) {
       flush();
     }
-    if (passThroughBuffer.size() < passThroughCap) {
-      passThroughBuffer.add(row);
-    }
-  }
-  if (buffered > 0) {
-    flush();
   }
   passThrough = passThroughBuffer.iterator();
 }
Suggestion importance[1-10]: 8

__

Why: If source.moveNext() throws an exception mid-stream, the partially buffered bulk request is never flushed, causing potential data loss. The try-finally pattern ensures pending writes are flushed even on failure.

Medium
Security
Strengthen system index validation

The dot-prefix check only validates the first character. Malicious or malformed
index names like foo.bar or ..hidden could bypass this check. Use a more robust
validation that checks if the entire index name represents a system/hidden index
pattern.

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java [810-816]

-if (node.getIndexName().startsWith(".")) {
+String indexName = node.getIndexName();
+if (indexName.startsWith(".") || indexName.contains("/.")) {
   throw new SemanticCheckException(
       String.format(
           "collect cannot write to system or hidden index [%s]; dot-prefixed indices are"
               + " refused",
-          node.getIndexName()));
+          indexName));
 }
Suggestion importance[1-10]: 3

__

Why: The suggested check for contains("/.") is incorrect for OpenSearch index names, which don't contain slashes. The existing startsWith(".") check is sufficient for detecting dot-prefixed system/hidden indices.

Low

Previous suggestions

Suggestions up to commit 9293d6b
CategorySuggestion                                                                                                                                    Impact
Possible issue
Add maximum retry limit

The infinite while (true) loop lacks a maximum retry limit, which could cause the
method to retry indefinitely if backoff exhaustion doesn't terminate it. Add an
explicit retry counter to prevent unbounded retries and ensure the method eventually
exits even under persistent failures.

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/EnumerableOpenSearchTableSpool.java [187-212]

 private static void bulkWithRetry(OpenSearchClient client, BulkRequest request) {
   java.util.Iterator<TimeValue> backoff = BackoffPolicy.exponentialBackoff().iterator();
   BulkRequest pending = request;
-  while (true) {
+  int maxRetries = 10;
+  int retryCount = 0;
+  while (retryCount < maxRetries) {
     BulkResponse response = client.bulk(pending);
     if (!response.hasFailures()) {
       return;
     }
     BulkRequest retry = new BulkRequest();
     for (BulkItemResponse item : response.getItems()) {
       if (item.isFailed() && item.getFailure().getStatus() == RestStatus.TOO_MANY_REQUESTS) {
         retry.add(pending.requests().get(item.getItemId()));
       }
     }
     if (retry.numberOfActions() == 0 || !backoff.hasNext()) {
       return;
     }
     try {
       Thread.sleep(backoff.next().millis());
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       return;
     }
     pending = retry;
+    retryCount++;
   }
 }
Suggestion importance[1-10]: 7

__

Why: The infinite while (true) loop relies solely on backoff.hasNext() to terminate, which may not provide a hard upper bound on retries. Adding an explicit maxRetries counter improves robustness by ensuring the method eventually exits even if the backoff policy doesn't terminate as expected, preventing unbounded retries under persistent failures.

Medium
General
Validate row-schema field count match

When values.length is less than fields.size(), the loop silently skips trailing
fields without logging or validation. This could mask data loss if the row structure
doesn't match the expected schema. Add validation to detect and handle schema
mismatches explicitly.

opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/EnumerableOpenSearchTableSpool.java [214-224]

 private IndexRequest toIndexRequest(@Nullable Object row) {
   Object[] values = (row instanceof Object[] arr) ? arr : new Object[] {row};
+  if (values.length < fields.size()) {
+    throw new IllegalStateException(
+      String.format("Row has %d values but schema expects %d fields", values.length, fields.size()));
+  }
   Map<String, Object> doc = new LinkedHashMap<>();
   for (int i = 0; i < fields.size() && i < values.length; i++) {
     String name = fields.get(i);
     if (values[i] != null && !OpenSearchIndex.METADATAFIELD_TYPE_MAP.containsKey(name)) {
       doc.put(name, values[i]);
     }
   }
   return new IndexRequest(indexName).source(doc);
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion adds validation to detect when values.length is less than fields.size(), which could indicate a schema mismatch. However, the current code already handles this gracefully by iterating only up to the minimum of the two lengths (i < fields.size() && i < values.length), so the validation may be overly strict and could reject valid cases where trailing fields are intentionally absent.

Low
Validate index name before prefix check

The dot-prefix check only validates the first character but doesn't handle edge
cases like empty strings or null values. Add explicit null/empty validation before
the prefix check to prevent potential NullPointerException or unexpected behavior.

core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java [810-816]

-if (node.getIndexName().startsWith(".")) {
+String indexName = node.getIndexName();
+if (indexName == null || indexName.isEmpty()) {
+  throw new SemanticCheckException("collect destination index name cannot be null or empty");
+}
+if (indexName.startsWith(".")) {
   throw new SemanticCheckException(
       String.format(
           "collect cannot write to system or hidden index [%s]; dot-prefixed indices are"
               + " refused",
-          node.getIndexName()));
+          indexName));
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion adds null/empty validation for node.getIndexName() before the dot-prefix check. While this improves defensive programming, the indexName is already validated earlier in the code path (it's a required field in the Collect AST node and is resolved through the catalog), so a null or empty value at this point would indicate a deeper bug rather than a user input issue.

Low

Terminal pass-through write: appends pipeline rows to a pre-existing index and
returns them as the result. RFC: opensearch-project#5596.

- Grammar (ppl OpenSearchPPL g4) + Collect AST + AstBuilder + anonymizer
- visitCollect builds LogicalTableSpool; EnumerableOpenSearchTableSpool + converter rule
  do the eager-drain batched-bulk write (429 retry/backoff) with pass-through output
- Options: source/host/sourcetype/marker stamps + testmode dry-run
- Plan-time safety: destination pre-existence + dot/hidden-index refusal; V2/legacy
  rejects collect (calcite-only), consistent with other Calcite-path commands
- OpenSearchClient.bulk is a default method so external implementors are unaffected
- Tests: CalcitePPLCollectTest, CollectWriteStrategyTest, NewAddedCommandsIT, CalcitePPLCollectIT
- User doc: docs/user/ppl/cmd/collect.md

Signed-off-by: Louis Chu <lingzhichu.clz@gmail.com>
@noCharger noCharger force-pushed the feature/ppl-collect branch from 9293d6b to 6098080 Compare July 3, 2026 11:07
@github-actions

github-actions Bot commented Jul 3, 2026

Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 6098080

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant