Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ deploy-kafka: deploy deploy-flink
kubectl wait --for=condition=Established=True crds/kafkas.kafka.strimzi.io
kubectl apply -f ./deploy/samples/kafkadb.yaml
kubectl apply -f ./deploy/dev/kafka.yaml
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=10m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-1 --for=condition=Ready --timeout=10m
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-2 --for=condition=Ready --timeout=10m
kubectl wait kafka.kafka.strimzi.io/one --for=condition=Ready --timeout=15m -n kafka
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-1 --for=condition=Ready --timeout=15m
kubectl wait kafkatopic.kafka.strimzi.io/kafka-database-existing-topic-2 --for=condition=Ready --timeout=15m

undeploy-kafka:
kubectl delete kafkatopic.kafka.strimzi.io --all || echo "skipping"
Expand Down
4 changes: 2 additions & 2 deletions deploy/dev/kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ metadata:
labels:
strimzi.io/cluster: one
spec:
replicas: 3
replicas: 1
roles:
- controller
storage:
Expand All @@ -42,7 +42,7 @@ metadata:
labels:
strimzi.io/cluster: one
spec:
replicas: 3
replicas: 1
roles:
- broker
storage:
Expand Down
31 changes: 31 additions & 0 deletions docs/user-guide/ddl-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,37 @@ remains imperative and explicit. Detection of *incompatible* metadata changes
`CREATE` and `CREATE OR REPLACE` apply the new definition regardless. Use
caution when changing schemas of in-flight pipelines.

## Dry-run: validating a script without deploying

The `deploy` connection property toggles whether DDL actually touches the
underlying deployers.

| `deploy` value | Behavior |
|--------------------|-----------------------------------------------------------------------------|
| `true` (default) | Normal operation. Each DDL invokes the deployers (create/update/delete). |
| `false` | Dry-run. Each DDL is parsed, validated, and applied to the in-memory schema, but no deployer is invoked. |

```
jdbc:hoptimator://...;deploy=false
```

In dry-run mode, a session can execute a multi-statement script end-to-end —
later statements see the in-memory effects of earlier ones (a `CREATE TABLE
FOO` followed by `CREATE VIEW BAR AS SELECT * FROM FOO` validates correctly),
and `DROP` removes its target from the in-memory schema so a follow-up
`CREATE` of the same name doesn't collide. Nothing reaches the deployers.

`deploy=false` is orthogonal to `mode`: combining it with `mode=apply`
dry-runs an apply-mode script.

> Dry-run is distinct from `!specify` (and the underlying `SPECIFY` mode;
> see [CLI reference](sql-cli.md#specify-sql)),
> which is the strict zero-side-effect preview used to render deployment
> artifacts for a single statement. `!specify` always invokes
> `deployer.specify()` and unwinds the in-memory schema afterward. Dry-run
> preserves the in-memory mutation across statements and invokes no deployer
> method at all.

## CREATE VIEW

```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,18 @@ public void execute(SqlCreateView create, CalcitePrepare.Context context) {
deployers = DeploymentService.deployers(view, connection);
ValidationService.validateOrThrow(deployers, connection);
logger.info("Validated view {}", viewName);
if (mode == HoptimatorDdlUtils.DdlMode.UPDATE) {
boolean dryRun = HoptimatorDdlUtils.isDryRun(connection);
if (dryRun) {
logger.info("Dry-run (deploy=false): skipping {} of view {}", mode, viewName);
} else if (mode == HoptimatorDdlUtils.DdlMode.UPDATE) {
logger.info("Deploying update view {}", viewName);
DeploymentService.update(deployers);
} else {
logger.info("Deploying create view {}", viewName);
DeploymentService.create(deployers);
}
logger.info("Deployed view {}", viewName);
mode.executeDeployers(deployers, connection);
if (!dryRun) {
logger.info("Deployed view {}", viewName);
}
schemaPlus.add(viewName, viewTable);
logger.info("Added view {} to schema {}", viewName, schemaPlus.getName());
} catch (SQLException | RuntimeException e) {
Expand Down Expand Up @@ -241,14 +245,18 @@ public void execute(SqlCreateTrigger create, CalcitePrepare.Context context) {
ValidationService.validateOrThrow(deployers, connection);
logger.info("Validated trigger {}", name);
HoptimatorDdlUtils.DdlMode mode = HoptimatorDdlUtils.effectiveMode(create.getReplace(), connection);
if (mode == HoptimatorDdlUtils.DdlMode.UPDATE) {
boolean dryRun = HoptimatorDdlUtils.isDryRun(connection);
if (dryRun) {
logger.info("Dry-run (deploy=false): skipping {} of trigger {}", mode, name);
} else if (mode == HoptimatorDdlUtils.DdlMode.UPDATE) {
logger.info("Updating trigger {}", name);
DeploymentService.update(deployers);
} else {
logger.info("Creating trigger {}", name);
DeploymentService.create(deployers);
}
logger.info("Deployed trigger {}", name);
mode.executeDeployers(deployers, connection);
if (!dryRun) {
logger.info("Deployed trigger {}", name);
}
logger.info("CREATE TRIGGER {} completed", name);
} catch (Exception e) {
if (deployers != null) {
Expand Down Expand Up @@ -336,7 +344,11 @@ public void execute(SqlFireTrigger fire, CalcitePrepare.Context context) {
try {
logger.info("Firing trigger {} with {} option(s)", name, options.size() - 1);
deployers = DeploymentService.deployers(trigger, connection);
DeploymentService.update(deployers);
if (HoptimatorDdlUtils.isDryRun(connection)) {
logger.info("Dry-run (deploy=false): skipping fire of trigger {}", name);
} else {
DeploymentService.update(deployers);
}
logger.info("FIRE TRIGGER {} completed", name);
} catch (Exception e) {
if (deployers != null) {
Expand Down Expand Up @@ -364,10 +376,14 @@ public void execute(SqlDropTrigger drop, CalcitePrepare.Context context) {

Collection<Deployer> deployers = null;
try {
logger.info("Deleting trigger {}", name);
deployers = DeploymentService.deployers(trigger, connection);
DeploymentService.delete(deployers);
logger.info("Deleted trigger {}", name);
if (HoptimatorDdlUtils.isDryRun(connection)) {
logger.info("Dry-run (deploy=false): skipping delete of trigger {}", name);
} else {
logger.info("Deleting trigger {}", name);
DeploymentService.delete(deployers);
logger.info("Deleted trigger {}", name);
}
logger.info("DROP TRIGGER {} completed", name);
} catch (Exception e) {
if (deployers != null) {
Expand Down Expand Up @@ -401,10 +417,14 @@ private void updateTriggerPausedState(SqlNode sqlNode, SqlIdentifier triggerName

Collection<Deployer> deployers = null;
try {
logger.info("Updating trigger {} with paused state: {}", name, paused);
deployers = DeploymentService.deployers(trigger, connection);
DeploymentService.update(deployers);
logger.info("Successfully updated trigger {} with paused state: {}", name, paused);
if (HoptimatorDdlUtils.isDryRun(connection)) {
logger.info("Dry-run (deploy=false): skipping update of trigger {} (paused state: {})", name, paused);
} else {
logger.info("Updating trigger {} with paused state: {}", name, paused);
DeploymentService.update(deployers);
logger.info("Successfully updated trigger {} with paused state: {}", name, paused);
}
} catch (Exception e) {
if (deployers != null) {
DeploymentService.restore(deployers);
Expand Down Expand Up @@ -459,8 +479,12 @@ public void execute(SqlDropObject drop, CalcitePrepare.Context context) {
MaterializedViewTable materializedViewTable = (MaterializedViewTable) table;
View view = new View(tablePath, materializedViewTable.viewSql());
deployers = DeploymentService.deployers(view, connection);
logger.info("Deleting materialized view {}", tableName);
DeploymentService.delete(deployers);
if (HoptimatorDdlUtils.isDryRun(connection)) {
logger.info("Dry-run (deploy=false): skipping delete of materialized view {}", tableName);
} else {
logger.info("Deleting materialized view {}", tableName);
DeploymentService.delete(deployers);
}
schemaPlus.removeTable(tableName);
logger.info("Removed materialized table {} from schema {}", tableName, schemaPlus.getName());
} else if (table instanceof ViewTable) {
Expand All @@ -471,8 +495,12 @@ public void execute(SqlDropObject drop, CalcitePrepare.Context context) {
ViewTable viewTable = (ViewTable) table;
View view = new View(tablePath, viewTable.getViewSql());
deployers = DeploymentService.deployers(view, connection);
logger.info("Deleting view {}", tableName);
DeploymentService.delete(deployers);
if (HoptimatorDdlUtils.isDryRun(connection)) {
logger.info("Dry-run (deploy=false): skipping delete of view {}", tableName);
} else {
logger.info("Deleting view {}", tableName);
DeploymentService.delete(deployers);
}
schemaPlus.removeTable(tableName);
logger.info("Removed view {} from schema {}", tableName, schemaPlus.getName());
} else if (table instanceof HoptimatorJdbcTable || table instanceof TemporaryTable) {
Expand All @@ -494,8 +522,12 @@ public void execute(SqlDropObject drop, CalcitePrepare.Context context) {
// before any deployer-level state change.
ValidationService.validateOrThrow(new PendingDelete<>(source), connection);
deployers = DeploymentService.deployers(source, connection);
logger.info("Deleting table {}", tableName);
DeploymentService.delete(deployers);
if (HoptimatorDdlUtils.isDryRun(connection)) {
logger.info("Dry-run (deploy=false): skipping delete of table {}", tableName);
} else {
logger.info("Deleting table {}", tableName);
DeploymentService.delete(deployers);
}
schemaPlus.removeTable(tableName);
logger.info("Removed table {} from schema {}", tableName, schemaPlus.getName());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,42 @@ private HoptimatorDdlUtils() {
/** Apply-mode value of {@link #MODE_PROPERTY}. */
public static final String MODE_APPLY = "apply";

/**
* Connection property that controls whether DDL statements actually deploy resources.
*
* <ul>
* <li>{@code true} (default) — normal operation. Each DDL invokes the underlying
* deployers (create/update/delete) to mutate external systems.</li>
* <li>{@code false} — dry-run. Each DDL is parsed, validated, and applied to the
* in-memory Calcite schema so subsequent statements in the same session can
* reference it; the underlying deployers are <em>not</em> touched. Useful for
* validating a multi-statement script end-to-end without producing side effects.</li>
* </ul>
*
* <p>Orthogonal to {@link #MODE_PROPERTY}: {@code mode=apply} + {@code deploy=false}
* dry-runs an apply-mode script.
*
* <p>Distinct from {@code SPECIFY} (the strict zero-side-effect path used by
* {@code !specify} and friends), which unwinds the in-memory schema and still invokes
* {@code deployer.specify()}. Dry-run preserves the in-memory mutation and does not
* invoke any deployer method.
*
* <p>Set per connection on the JDBC URL, e.g. {@code jdbc:hoptimator://...;deploy=false}
* — see {@code docs/user-guide/ddl-reference.md}.
*/
public static final String DEPLOY_PROPERTY = "deploy";

/**
* Resolves the effective {@link DdlMode} for a {@code CREATE} statement, combining the
* statement's {@code OR REPLACE} flag with the connection's {@link #MODE_PROPERTY}.
*
* <p>In {@code create} mode (the default): {@code CREATE} → {@link DdlMode#CREATE} and
* {@code CREATE OR REPLACE} → {@link DdlMode#UPDATE}. In {@code apply} mode: both forms
* resolve to {@link DdlMode#UPDATE}, making CREATE idempotent.
*
* <p>Independent of {@link #DEPLOY_PROPERTY}: the returned mode is the same in dry-run
* and live runs, since dry-run is decided inside {@link DdlMode#executeDeployers} by
* consulting {@link #isDryRun}.
*/
static DdlMode effectiveMode(boolean orReplace, HoptimatorConnection conn) {
if (isApplyMode(conn)) {
Expand All @@ -136,6 +165,28 @@ static boolean isApplyMode(HoptimatorConnection conn) {
return MODE_APPLY.equalsIgnoreCase(mode);
}

/** Whether the connection is configured for dry-run DDL (see {@link #DEPLOY_PROPERTY}). */
static boolean isDryRun(Connection conn) {
if (!(conn instanceof HoptimatorConnection)) {
return false;
}
Properties props = ((HoptimatorConnection) conn).connectionProperties();
if (props == null) {
return false;
}
return "false".equalsIgnoreCase(props.getProperty(DEPLOY_PROPERTY, "true"));
}

/**
* Whether deployment should be skipped for this mode + connection combination.
* Returns {@code true} only for mutable modes (CREATE/UPDATE) when the connection
* has {@code deploy=false}. SPECIFY is never skipped — it renders specs as its
* primary purpose.
*/
static boolean shouldSkipDeployment(DdlMode mode, Connection conn) {
return mode.mutable() && isDryRun(conn);
}

/**
* The result of a {@link #specifyFromSql} call: the YAML artifact specs, the sink row type,
* and the fully-qualified path of the sink (viewPath).
Expand All @@ -160,14 +211,23 @@ public static final class SpecifyResult {
}

/**
* Controls whether a DDL operation performs a real deployment (CREATE or UPDATE)
* or a dry-run (SPECIFY).
* Controls how a {@code CREATE} statement is resolved against the connection's
* {@link #MODE_PROPERTY}.
*
* <p>{@code CREATE} maps to either {@link #CREATE} (strict) or {@link #UPDATE} (apply-mode
* convergence or explicit {@code CREATE OR REPLACE}). {@link #SPECIFY} is the strict
* zero-side-effect preview used by {@code !specify} and friends.
*
* <p>Dry-run (see {@link #DEPLOY_PROPERTY}) is orthogonal and resolved inline at each
* deployer call site by checking {@link #isDryRun}.
*/
enum DdlMode {
CREATE {
@Override
List<String> executeDeployers(Collection<Deployer> deployers, Connection conn) throws SQLException {
DeploymentService.create(deployers);
if (!isDryRun(conn)) {
DeploymentService.create(deployers);
}
return Collections.emptyList();
}

Expand All @@ -179,7 +239,9 @@ boolean mutable() {
UPDATE {
@Override
List<String> executeDeployers(Collection<Deployer> deployers, Connection conn) throws SQLException {
DeploymentService.update(deployers);
if (!isDryRun(conn)) {
DeploymentService.update(deployers);
}
return Collections.emptyList();
}

Expand Down Expand Up @@ -443,18 +505,23 @@ static SpecifyResult processCreateMaterializedView(CalcitePrepare.Context ctx,
logger.info("Validated materialized view {}", viewName);

// Execute (create/update) or collect specs (specify).
if (mode == DdlMode.UPDATE) {
boolean dryRun = shouldSkipDeployment(mode, conn);
if (dryRun) {
logger.info("Dry-run (deploy=false): skipping {} of materialized view {}", mode, viewName);
} else if (mode == DdlMode.UPDATE) {
logger.info("Deploying update materialized view {}", viewName);
} else if (mode == DdlMode.CREATE) {
logger.info("Deploying create materialized view {}", viewName);
} else {
logger.info("Specifying materialized view {}", viewName);
}
List<String> specs = mode.executeDeployers(deployers, conn);
if (mode.mutable()) {
if (mode.mutable() && !dryRun) {
logger.info("Deployed materialized view {}", viewName);
} else {
// SPECIFY (dry-run): roll back any side effects made by deployers during specify().
} else if (!mode.mutable()) {
// SPECIFY (single-statement preview): roll back any side effects made by deployers
// during specify(). Note: deploy=false dry-run does not touch deployers at all and
// therefore has nothing to restore.
DeploymentService.restore(deployers);
}
success = true;
Expand Down Expand Up @@ -650,18 +717,23 @@ public RexNode newColumnDefaultValue(RelOptTable table, int iColumn,
logger.info("Validating deployable resources for table {}", tableName);
ValidationService.validateOrThrow(deployers, conn);

if (mode == DdlMode.UPDATE) {
boolean dryRun = shouldSkipDeployment(mode, conn);
if (dryRun) {
logger.info("Dry-run (deploy=false): skipping {} of table {}", mode, source);
} else if (mode == DdlMode.UPDATE) {
logger.info("Deploying update table {}", source);
} else if (mode == DdlMode.CREATE) {
logger.info("Deploying create table {}", source);
} else {
logger.info("Specifying table {}", source);
}
List<String> specs = mode.executeDeployers(deployers, conn);
if (mode.mutable()) {
if (mode.mutable() && !dryRun) {
logger.info("Deployed table {}", source);
} else {
// SPECIFY (dry-run): roll back any side effects made by deployers during specify()
} else if (!mode.mutable()) {
// SPECIFY (single-statement preview): roll back any side effects made by deployers
// during specify(). Note: deploy=false dry-run does not touch deployers at all and
// therefore has nothing to restore.
DeploymentService.restore(deployers);
}
success = true;
Expand Down Expand Up @@ -727,10 +799,17 @@ static SpecifyResult processCreateDatabase(HoptimatorConnection conn,
deployers = DeploymentService.deployers(database, conn);
ValidationService.validateOrThrow(deployers, conn);

boolean dryRun = shouldSkipDeployment(mode, conn);
if (dryRun) {
logger.info("Dry-run (deploy=false): skipping {} of database {}", mode, name);
}
List<String> specs = mode.executeDeployers(deployers, conn);
if (mode.mutable()) {
if (mode.mutable() && !dryRun) {
logger.info("Deployed database {}", name);
} else {
} else if (!mode.mutable()) {
// SPECIFY (single-statement preview): roll back any side effects made by deployers
// during specify(). Note: deploy=false dry-run does not touch deployers at all and
// therefore has nothing to restore.
DeploymentService.restore(deployers);
}
return new SpecifyResult(specs, null, Collections.singletonList(name));
Expand Down
Loading
Loading