Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions engine/src/main/java/com/github/jinba1/cuckoodb/ColumnMeta.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.github.jinba1.cuckoodb;

/**
* Describes one output column of a {@link QueryResultSet}, in result-column order.
*
* <p>Result rows are strictly positional and column names are <em>not</em> unique — a
* join {@code SELECT *} can emit the same bare name twice (e.g. two {@code a} columns).
* {@code qualifiedName} disambiguates those: it is the dotted schema origin
* (e.g. {@code student.a} vs {@code enrolled.a}) when one exists, and {@code null} for
* aggregate/computed columns and for single-table scans whose schema carries no table
* prefix. Clients must address columns by position, not by name.
*
* <p>{@code type} is best-effort, inferred from the first row's runtime value, so it is
* {@code null} for an empty result (no row to infer from). The authoritative typed schema
* for a base table comes from the catalog, not this field.
*
* @param name the bare header name (table prefix stripped; aggregate keys kept whole)
* @param qualifiedName the dotted schema origin, or null when there is none
* @param type the inferred column type, or null when the result is empty
*/
public record ColumnMeta(String name, String qualifiedName, ColumnType type) {
}
123 changes: 117 additions & 6 deletions engine/src/main/java/com/github/jinba1/cuckoodb/CuckooDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
Expand Down Expand Up @@ -125,15 +127,13 @@ public static QueryResult execute(Operator root, String outputFile) {
try {
try (CSVPrinter printer = new CSVPrinter(new FileWriter(outputFile), format)) {
printer.printRecord(headers);
Tuple tuple;
while ((tuple = root.getNextTuple()) != null) {
rows = drain(root, tuple -> {
List<String> fields = new ArrayList<>(tuple.getTuple().size());
for (Value v : tuple.getTuple()) {
fields.add(v.toString());
}
printer.printRecord(fields);
rows++;
}
});
}
} catch (RuntimeException e) {
// QueryExecutionException and internal errors alike: never leave a
Expand All @@ -148,8 +148,7 @@ public static QueryResult execute(Operator root, String outputFile) {
"Failed to write output file '" + outputFile + "': " + e.getMessage());
}

// The planner places Limit topmost, so the root knows whether the cap cut the result
boolean truncated = root instanceof LimitOperator limitOp && limitOp.wasTruncated();
boolean truncated = wasTruncated(root);
QueryResult result = truncated ? QueryResult.truncated(rows) : QueryResult.complete(rows);

System.out.println("Query executed successfully!");
Expand All @@ -158,6 +157,118 @@ public static QueryResult execute(Operator root, String outputFile) {
return result;
}

/**
* Drains the plan into an in-memory result set — column metadata, positional rows, and the
* same truncation/hint signal {@link #execute} reports, but with no stdout and no file write.
* This is the library/REST entry point; the CLI keeps using {@link #execute}. Both share
* {@link #drain}, so they cannot diverge in iteration, row count, or truncation semantics.
* @param root the root operator of an executable plan (never null; EXPLAIN is handled before here)
* @return the fully-materialized result
*/
public static QueryResultSet executeToResultSet(Operator root) {
String schemaId = root.propagateSchemaId();
List<String> names = root.getContext().getOrderedColumnNames(schemaId);
List<List<Value>> rows = new ArrayList<>();
try {
drain(root, tuple -> rows.add(List.copyOf(tuple.getTuple())));
} catch (IOException e) {
// The in-memory sink does no I/O; drain only declares IOException for the CSV path.
throw new QueryExecutionException(ErrorCode.INTERNAL,
"Unexpected I/O while draining query result: " + e.getMessage());
}
boolean truncated = wasTruncated(root);
String hint = truncated ? QueryResult.truncated(rows.size()).hint() : null;
List<ColumnMeta> columns = buildColumns(root.getContext(), schemaId, names, rows);
// The result is a value handed to library/REST callers; the outer lists are
// unmodifiable so a consumer cannot mutate the result after the fact (each inner
// row is already immutable via List.copyOf in the drain sink).
return new QueryResultSet(Collections.unmodifiableList(columns),
Collections.unmodifiableList(rows), truncated, hint);
}

/**
* Pulls every tuple from the plan to EOF, handing each to {@code sink}, and returns the row
* count. The single drain point for both result paths: truncation can only be read once this
* has run to a null tuple, so sharing it keeps the file and in-memory paths consistent.
*/
private static long drain(Operator root, TupleSink sink) throws IOException {
long rows = 0;
Tuple tuple;
while ((tuple = root.getNextTuple()) != null) {
sink.accept(tuple);
rows++;
}
return rows;
}

/** A per-tuple action that may fail with an I/O error (the CSV writer's printRecord does). */
@FunctionalInterface
private interface TupleSink {
void accept(Tuple tuple) throws IOException;
}

/**
* Whether a LIMIT cut the result short. The planner places LIMIT topmost, so only the root
* can report this, and only after the drain reached EOF (the peek past the cap has happened).
*/
private static boolean wasTruncated(Operator root) {
return root instanceof LimitOperator limitOp && limitOp.wasTruncated();
}

/**
* Builds per-position column metadata for a result set. {@code name} is the bare header
* (table prefix stripped, aggregate keys kept whole); {@code qualifiedName} is the dotted,
* non-aggregate schema key for that index when one exists (null for single-table scans and
* computed columns); {@code type} is inferred from the first row, or null for an empty result.
*/
private static List<ColumnMeta> buildColumns(PlanContext ctx, String schemaId,
List<String> names, List<List<Value>> rows) {
int width = names.size();
String[] qualified = new String[width];
Map<String, Integer> schema = ctx.getSchema(schemaId);
if (schema != null) {
// Mirror getOrderedColumnNames' deterministic rule — sorted keys, first non-null
// wins per index — so `name` and `qualifiedName` are chosen from a single key
// choice and never disagree run-to-run. Skip internal intermediate-schema keys
// (temp_<hex>.col) outright: a join over a pushed-down source registers BOTH a
// base-qualified key (Enrolled.a) and a temp_ key at the same index, and the
// temp_ id is not a real origin (and is plan/JVM-unstable).
List<String> keys = new ArrayList<>(schema.keySet());
Collections.sort(keys);
for (String key : keys) {
int idx = schema.get(key);
if (idx < 0 || idx >= width || qualified[idx] != null
|| key.startsWith(Constants.INTERMEDIATE_SCHEMA_PREFIX)) {
continue;
}
// A dotted, non-aggregate key (e.g. "Student.a") is the column's qualified
// origin. Aggregate keys like "sum(student.c)" also contain '.', so the '('
// check excludes them. Lowercased to match the bare name (-> "student.a").
if (key.indexOf('.') >= 0 && key.indexOf('(') < 0) {
qualified[idx] = key.toLowerCase();
}
}
}
List<Value> firstRow = rows.isEmpty() ? null : rows.get(0);
List<ColumnMeta> columns = new ArrayList<>(width);
for (int i = 0; i < width; i++) {
ColumnType type = firstRow == null ? null : inferType(firstRow.get(i));
columns.add(new ColumnMeta(names.get(i), qualified[i], type));
}
return columns;
}

/** Maps a runtime value to its column type. Exhaustive over the sealed {@link Value}. */
private static ColumnType inferType(Value value) {
if (value instanceof IntValue) {
return ColumnType.INT;
} else if (value instanceof StringValue) {
return ColumnType.STRING;
}
throw new QueryExecutionException(ErrorCode.INTERNAL,
"Unknown value type: " + value.getClass().getName());
}

/** Never leave a truncated file that looks like a complete result. */
private static void deletePartialOutput(File outputFileObj, String outputFile) {
if (outputFileObj.isFile() && !outputFileObj.delete()) {
Expand Down
87 changes: 69 additions & 18 deletions engine/src/main/java/com/github/jinba1/cuckoodb/DBCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;

/**
Expand All @@ -29,17 +30,19 @@ public class DBCatalog {

private static DBCatalog instance;

private final Map<String, Path> dbLocations;
private final Map<String, Map<String, Integer>> dbSchemata;
private final Map<String, List<ColumnType>> dbColumnTypes;
/**
* One entry per table, each a fully-populated {@link TableMeta} (location + schema +
* types). A single map means a table is published with one atomic write and read with
* one atomic lookup — no torn state across what used to be three parallel maps. This is
* what makes runtime {@link #registerTable} safe against concurrent readers.
*/
private final Map<String, TableMeta> tables;

/**
* Private constructor to ensure singleton design.
*/
private DBCatalog() {
dbLocations = new java.util.concurrent.ConcurrentHashMap<>();
dbSchemata = new java.util.concurrent.ConcurrentHashMap<>();
dbColumnTypes = new java.util.concurrent.ConcurrentHashMap<>();
tables = new ConcurrentHashMap<>();
}

/**
Expand Down Expand Up @@ -94,15 +97,48 @@ private void loadDBCatalog(String dBDirectory) {
for (Path csv : csvs) {
String fileName = csv.getFileName().toString();
String tableName = fileName.substring(0, fileName.length() - 4);
loadTable(tableName, csv);
tables.put(tableName, parseTable(tableName, csv));
}
} catch (IOException e) {
throw new QueryExecutionException(ErrorCode.DATA_ERROR,
"Error scanning data directory " + dataPath + ": " + e.getMessage());
}
}

private void loadTable(String tableName, Path csv) throws IOException {
/**
* Registers a single table at runtime from a CSV file, for callers (the REST gateway)
* that add tables after startup. Parses and infers the schema with the same rules as
* directory load, then publishes the table with one atomic {@code putIfAbsent}.
* <p>
* Returns {@code false} (and leaves the existing table untouched) when a table of that
* name already exists — the caller's 409 signal. Never {@code containsKey}-then-{@code put},
* which would reintroduce the race this design removes.
* @param tableName the catalog name to publish under
* @param csv the CSV file backing the table; must outlive every query that scans it
* @return {@code true} if this call registered the table; {@code false} if the name was taken
* @throws QueryExecutionException with {@link ErrorCode#DATA_ERROR} if the CSV is malformed
*/
public boolean registerTable(String tableName, Path csv) {
TableMeta meta;
try {
meta = parseTable(tableName, csv);
} catch (IOException e) {
throw new QueryExecutionException(ErrorCode.DATA_ERROR,
"Could not read table '" + tableName + "' from " + csv + ": " + e.getMessage());
}
return tables.putIfAbsent(tableName, meta) == null;
}

/**
* Parses one CSV into table metadata: column names from the header row, types inferred
* from the data rows. Pure (no map writes) so both directory load and {@link #registerTable}
* share identical parse-and-infer rules. The returned schema and types are unmodifiable.
* <p>Type inference starts every column as INT and demotes to STRING on the first
* non-integer field, so a <em>header-only</em> CSV (no data rows) infers every column as
* INT — there is no evidence to refute INT. REST callers uploading a schema-only file get
* an all-INT table until rows arrive.
*/
private static TableMeta parseTable(String tableName, Path csv) throws IOException {
CSVFormat format = CSVFormat.RFC4180.builder()
.setIgnoreSurroundingSpaces(true)
.build();
Expand Down Expand Up @@ -146,9 +182,9 @@ private void loadTable(String tableName, Path csv) throws IOException {
types.add(isInt[i] ? ColumnType.INT : ColumnType.STRING);
}

dbSchemata.put(tableName, Collections.unmodifiableMap(columnMap));
dbColumnTypes.put(tableName, Collections.unmodifiableList(types));
dbLocations.put(tableName, csv);
return new TableMeta(csv,
Collections.unmodifiableMap(columnMap),
Collections.unmodifiableList(types));
}
}

Expand All @@ -162,13 +198,25 @@ private static boolean parsesAsInt(String field) {
}
}

/**
* Returns the full metadata for a table in one atomic lookup, or null if absent.
* Callers that need more than one of location/schema/types (e.g. ScanOperator) use
* this so the fields they read are guaranteed to come from the same registration.
* @param tableName The name of the table
* @return The table's metadata, or null if the table is not in the catalog
*/
public TableMeta getTableMeta(String tableName) {
return tables.get(tableName);
}

/**
* Returns the file path for a specified table.
* @param tableName The name of the table
* @return The Path object representing the table's data file location
*/
public Path getDBLocation(String tableName) {
return dbLocations.get(tableName);
TableMeta meta = tables.get(tableName);
return meta == null ? null : meta.path();
}

/**
Expand All @@ -178,7 +226,8 @@ public Path getDBLocation(String tableName) {
* @return A map from column names to their positions (indices)
*/
public Map<String, Integer> getDBSchemata(String tableName) {
return dbSchemata.get(tableName);
TableMeta meta = tables.get(tableName);
return meta == null ? null : meta.schema();
}

/**
Expand All @@ -187,7 +236,8 @@ public Map<String, Integer> getDBSchemata(String tableName) {
* @return A list of ColumnType values in column order, or null if table not found
*/
public List<ColumnType> getColumnTypes(String tableName) {
return dbColumnTypes.get(tableName);
TableMeta meta = tables.get(tableName);
return meta == null ? null : meta.types();
}

/**
Expand All @@ -196,7 +246,7 @@ public List<ColumnType> getColumnTypes(String tableName) {
* @return The sorted list of loaded table names
*/
public List<String> getTableNames() {
List<String> names = new ArrayList<>(dbLocations.keySet());
List<String> names = new ArrayList<>(tables.keySet());
Collections.sort(names);
return names;
}
Expand All @@ -220,7 +270,7 @@ public QueryExecutionException unknownTable(String tableName) {
* @return true if the table exists, false otherwise
*/
public boolean tableExists(String tableName) {
return (dbLocations.containsKey(tableName) && dbSchemata.containsKey(tableName));
return tables.containsKey(tableName);
}

/**
Expand All @@ -230,9 +280,10 @@ public boolean tableExists(String tableName) {
* @return true if the column exists in the table, false otherwise
*/
public boolean columnExists(String tableName, String columnName) {
if (!tableExists(tableName)) {
TableMeta meta = tables.get(tableName);
if (meta == null) {
return false;
}
return dbSchemata.get(tableName).containsKey(columnName.toLowerCase());
return meta.schema().containsKey(columnName.toLowerCase());
}
}
Loading
Loading