diff --git a/src/iceberg/avro/avro_data_util.cc b/src/iceberg/avro/avro_data_util.cc index 17bbb394b..fb2f58bd1 100644 --- a/src/iceberg/avro/avro_data_util.cc +++ b/src/iceberg/avro/avro_data_util.cc @@ -457,6 +457,11 @@ Status AppendFieldToBuilder(const ::avro::NodePtr& avro_node, const SchemaField& projected_field, const arrow::MetadataColumnContext& metadata_context, ::arrow::ArrayBuilder* array_builder) { + if (projection.kind == FieldProjection::Kind::kNull) { + ICEBERG_ARROW_RETURN_NOT_OK(array_builder->AppendNull()); + return {}; + } + if (avro_node->type() == ::avro::AVRO_UNION) { size_t branch = avro_datum.unionBranch(); if (avro_node->leafAt(branch)->type() == ::avro::AVRO_NULL) { @@ -507,6 +512,9 @@ Status ExtractDatumFromArray(const ::arrow::Array& array, int64_t index, } if (array.IsNull(index)) { + if (datum->type() == ::avro::AVRO_NULL) { + return {}; + } if (!datum->isUnion()) [[unlikely]] { return InvalidSchema("Cannot extract null to non-union type: {}", ::avro::toString(datum->type())); diff --git a/src/iceberg/avro/avro_direct_decoder.cc b/src/iceberg/avro/avro_direct_decoder.cc index cb4e869cc..19ce77bbd 100644 --- a/src/iceberg/avro/avro_direct_decoder.cc +++ b/src/iceberg/avro/avro_direct_decoder.cc @@ -588,6 +588,12 @@ Status DecodeFieldToBuilder(const ::avro::NodePtr& avro_node, ::avro::Decoder& d const SchemaField& projected_field, const arrow::MetadataColumnContext& metadata_context, ::arrow::ArrayBuilder* array_builder, DecodeContext& ctx) { + if (projection.kind == FieldProjection::Kind::kNull) { + ICEBERG_RETURN_UNEXPECTED(SkipAvroValue(avro_node, decoder)); + ICEBERG_ARROW_RETURN_NOT_OK(array_builder->AppendNull()); + return {}; + } + if (avro_node->type() == ::avro::AVRO_UNION) { const size_t branch_index = decoder.decodeUnionIndex(); diff --git a/src/iceberg/avro/avro_direct_encoder.cc b/src/iceberg/avro/avro_direct_encoder.cc index caab7f699..5dcfd2511 100644 --- a/src/iceberg/avro/avro_direct_encoder.cc +++ b/src/iceberg/avro/avro_direct_encoder.cc @@ -80,15 +80,16 @@ Status EncodeArrowToAvro(const ::avro::NodePtr& avro_node, ::avro::Encoder& enco return EncodeArrowToAvro(branches.value_node, encoder, type, array, row_index, ctx); } + if (avro_node->type() == ::avro::AVRO_NULL) { + encoder.encodeNull(); + return {}; + } + if (is_null) { return InvalidArgument("Null value in non-nullable field"); } switch (avro_node->type()) { - case ::avro::AVRO_NULL: - encoder.encodeNull(); - return {}; - case ::avro::AVRO_BOOL: { const auto& bool_array = internal::checked_cast(array); diff --git a/src/iceberg/avro/avro_schema_util.cc b/src/iceberg/avro/avro_schema_util.cc index 3d61d283f..4ecd87ebc 100644 --- a/src/iceberg/avro/avro_schema_util.cc +++ b/src/iceberg/avro/avro_schema_util.cc @@ -237,6 +237,11 @@ Status ToAvroNodeVisitor::Visit(const BinaryType& type, ::avro::NodePtr* node) { return {}; } +Status ToAvroNodeVisitor::Visit(const UnknownType&, ::avro::NodePtr* node) { + *node = std::make_shared<::avro::NodePrimitive>(::avro::AVRO_NULL); + return {}; +} + Status ToAvroNodeVisitor::Visit(const StructType& type, ::avro::NodePtr* node) { *node = std::make_shared<::avro::NodeRecord>(); @@ -338,7 +343,7 @@ Status ToAvroNodeVisitor::Visit(const SchemaField& field, ::avro::NodePtr* node) field_ids_.push(field.field_id()); ICEBERG_RETURN_UNEXPECTED(VisitTypeInline(*field.type(), /*visitor=*/this, node)); - if (field.optional()) { + if (field.optional() && (*node)->type() != ::avro::AVRO_NULL) { ::avro::MultiLeaves union_types; union_types.add(std::make_shared<::avro::NodePrimitive>(::avro::AVRO_NULL)); union_types.add(std::move(*node)); @@ -383,8 +388,8 @@ Status HasIdVisitor::Visit(const ::avro::NodePtr& node) { case ::avro::AVRO_STRING: case ::avro::AVRO_BYTES: case ::avro::AVRO_FIXED: - return {}; case ::avro::AVRO_NULL: + return {}; case ::avro::AVRO_ENUM: default: return InvalidSchema("Unsupported Avro type: {}", static_cast(node->type())); @@ -512,6 +517,10 @@ Result GetFieldId(const ::avro::NodePtr& node, size_t field_idx) { Status ValidateAvroSchemaEvolution(const Type& expected_type, const ::avro::NodePtr& avro_node) { + if (avro_node->type() == ::avro::AVRO_NULL) { + return {}; + } + switch (expected_type.type_id()) { case TypeId::kBoolean: if (avro_node->type() == ::avro::AVRO_BOOL) { @@ -615,6 +624,8 @@ Status ValidateAvroSchemaEvolution(const Type& expected_type, return {}; } break; + case TypeId::kUnknown: + return {}; default: break; } @@ -650,6 +661,35 @@ Result ProjectNested(const Type& expected_type, const ::avro::NodePtr& avro_node, bool prune_source); +Result ProjectField(const SchemaField& expected_field, + const ::avro::NodePtr& avro_node, + size_t source_index, bool prune_source) { + const Type& expected_type = *expected_field.type(); + ::avro::NodePtr field_node; + ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(avro_node, &field_node)); + + FieldProjection projection; + if (expected_type.type_id() == TypeId::kUnknown || + field_node->type() == ::avro::AVRO_NULL) { + if (!expected_field.optional()) { + return InvalidSchema("Cannot project required field with ID: {} as null", + expected_field.field_id()); + } + projection.kind = FieldProjection::Kind::kNull; + return projection; + } + + if (expected_type.is_nested()) { + ICEBERG_ASSIGN_OR_RAISE(projection, + ProjectNested(expected_type, field_node, prune_source)); + } else { + ICEBERG_RETURN_UNEXPECTED(ValidateAvroSchemaEvolution(expected_type, field_node)); + } + projection.from = source_index; + projection.kind = FieldProjection::Kind::kProjected; + return projection; +} + Result ProjectStruct(const StructType& struct_type, const ::avro::NodePtr& avro_node, bool prune_source) { @@ -685,18 +725,9 @@ Result ProjectStruct(const StructType& struct_type, FieldProjection child_projection; if (auto iter = node_info_map.find(field_id); iter != node_info_map.cend()) { - ::avro::NodePtr field_node; - ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(iter->second.field_node, &field_node)); - if (expected_field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE( - child_projection, - ProjectNested(*expected_field.type(), field_node, prune_source)); - } else { - ICEBERG_RETURN_UNEXPECTED( - ValidateAvroSchemaEvolution(*expected_field.type(), field_node)); - } - child_projection.from = iter->second.local_index; - child_projection.kind = FieldProjection::Kind::kProjected; + ICEBERG_ASSIGN_OR_RAISE(child_projection, + ProjectField(expected_field, iter->second.field_node, + iter->second.local_index, prune_source)); } else if (MetadataColumns::IsMetadataColumn(field_id)) { child_projection.kind = FieldProjection::Kind::kMetadata; } else if (expected_field.optional()) { @@ -733,20 +764,9 @@ Result ProjectList(const ListType& list_type, } FieldProjection element_projection; - ::avro::NodePtr element_node; - ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(avro_node->leafAt(0), &element_node)); - if (expected_element_field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE( - element_projection, - ProjectNested(*expected_element_field.type(), element_node, prune_source)); - } else { - ICEBERG_RETURN_UNEXPECTED( - ValidateAvroSchemaEvolution(*expected_element_field.type(), element_node)); - } - - // Set the element projection metadata but preserve its children - element_projection.kind = FieldProjection::Kind::kProjected; - element_projection.from = size_t{0}; + ICEBERG_ASSIGN_OR_RAISE(element_projection, + ProjectField(expected_element_field, avro_node->leafAt(0), + /*source_index*/ size_t{0}, prune_source)); FieldProjection result; result.children.emplace_back(std::move(element_projection)); @@ -802,18 +822,10 @@ Result ProjectMap(const MapType& map_type, for (size_t i = 0; i < map_node->leaves(); ++i) { FieldProjection sub_projection; - ::avro::NodePtr sub_node; - ICEBERG_RETURN_UNEXPECTED(UnwrapUnion(map_node->leafAt(i), &sub_node)); const auto& expected_sub_field = map_type.fields()[i]; - if (expected_sub_field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE(sub_projection, ProjectNested(*expected_sub_field.type(), - sub_node, prune_source)); - } else { - ICEBERG_RETURN_UNEXPECTED( - ValidateAvroSchemaEvolution(*expected_sub_field.type(), sub_node)); - } - sub_projection.kind = FieldProjection::Kind::kProjected; - sub_projection.from = i; + ICEBERG_ASSIGN_OR_RAISE( + sub_projection, + ProjectField(expected_sub_field, map_node->leafAt(i), i, prune_source)); result.children.emplace_back(std::move(sub_projection)); } @@ -1049,9 +1061,9 @@ Result<::avro::NodePtr> MakeAvroNodeWithFieldIds(const ::avro::NodePtr& original case ::avro::AVRO_STRING: case ::avro::AVRO_BYTES: case ::avro::AVRO_FIXED: + case ::avro::AVRO_NULL: // For primitive types, just return a copy return original_node; - case ::avro::AVRO_NULL: case ::avro::AVRO_ENUM: default: return InvalidSchema("Unsupported Avro type for field ID application: {}", diff --git a/src/iceberg/avro/avro_schema_util_internal.h b/src/iceberg/avro/avro_schema_util_internal.h index e3b7a7ffd..f5049e5cf 100644 --- a/src/iceberg/avro/avro_schema_util_internal.h +++ b/src/iceberg/avro/avro_schema_util_internal.h @@ -58,6 +58,7 @@ class ToAvroNodeVisitor { Status Visit(const UuidType& type, ::avro::NodePtr* node); Status Visit(const FixedType& type, ::avro::NodePtr* node); Status Visit(const BinaryType& type, ::avro::NodePtr* node); + Status Visit(const UnknownType&, ::avro::NodePtr*); Status Visit(const StructType& type, ::avro::NodePtr* node); Status Visit(const ListType& type, ::avro::NodePtr* node); Status Visit(const MapType& type, ::avro::NodePtr* node); diff --git a/src/iceberg/json_serde.cc b/src/iceberg/json_serde.cc index 3944e510c..550d0f07e 100644 --- a/src/iceberg/json_serde.cc +++ b/src/iceberg/json_serde.cc @@ -377,6 +377,8 @@ nlohmann::json ToJson(const Type& type) { } case TypeId::kUuid: return "uuid"; + case TypeId::kUnknown: + return "unknown"; } std::unreachable(); } @@ -502,6 +504,8 @@ Result> TypeFromJson(const nlohmann::json& json) { return std::make_unique(); } else if (type_str == "uuid") { return std::make_unique(); + } else if (type_str == "unknown") { + return std::make_unique(); } else if (type_str.starts_with("fixed")) { std::regex fixed_regex(R"(fixed\[\s*(\d+)\s*\])"); std::smatch match; @@ -949,6 +953,7 @@ Result> ParseSchemas( for (const auto& schema_json : schema_array) { ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr schema, SchemaFromJson(schema_json)); + ICEBERG_RETURN_UNEXPECTED(schema->Validate(format_version)); if (schema->schema_id() == current_schema_id) { current_schema = schema; } @@ -965,6 +970,7 @@ Result> ParseSchemas( ICEBERG_ASSIGN_OR_RAISE(auto schema_json, GetJsonValue(json, kSchema)); ICEBERG_ASSIGN_OR_RAISE(current_schema, SchemaFromJson(schema_json)); + ICEBERG_RETURN_UNEXPECTED(current_schema->Validate(format_version)); current_schema_id = current_schema->schema_id(); schemas.push_back(current_schema); } diff --git a/src/iceberg/parquet/parquet_data_util.cc b/src/iceberg/parquet/parquet_data_util.cc index 43efd1cbd..0c7c6c2ca 100644 --- a/src/iceberg/parquet/parquet_data_util.cc +++ b/src/iceberg/parquet/parquet_data_util.cc @@ -140,6 +140,12 @@ Result> ProjectStructArray( projected_arrays.emplace_back(std::move(projected_array)); } + if (projected_arrays.empty()) { + return std::make_shared<::arrow::StructArray>( + output_struct_type, struct_array->length(), projected_arrays, + struct_array->null_bitmap(), struct_array->null_count(), struct_array->offset()); + } + ICEBERG_ARROW_ASSIGN_OR_RETURN( auto output_array, ::arrow::StructArray::Make(projected_arrays, output_struct_type->fields(), @@ -166,7 +172,14 @@ Result> ProjectListArrayImpl( const auto& output_element_type = output_list_type->value_type(); std::shared_ptr<::arrow::Array> projected_values; - if (element_field.type()->is_nested()) { + if (element_projection.kind == FieldProjection::Kind::kNull) { + ICEBERG_ASSIGN_OR_RAISE( + projected_values, + MakeNullArray(output_element_type, list_array->values()->length(), pool)); + } else if (element_projection.kind != FieldProjection::Kind::kProjected) { + return NotImplemented("Unsupported list element projection kind: {}", + ToString(element_projection.kind)); + } else if (element_field.type()->is_nested()) { const auto& nested_type = internal::checked_cast(*element_field.type()); ICEBERG_ASSIGN_OR_RAISE( @@ -219,7 +232,14 @@ Result> ProjectMapArray( // Project keys std::shared_ptr<::arrow::Array> projected_keys; - if (key_type->is_nested()) { + if (key_projection.kind == FieldProjection::Kind::kNull) { + ICEBERG_ASSIGN_OR_RAISE( + projected_keys, + MakeNullArray(output_map_type->key_type(), map_array->keys()->length(), pool)); + } else if (key_projection.kind != FieldProjection::Kind::kProjected) { + return NotImplemented("Unsupported map key projection kind: {}", + ToString(key_projection.kind)); + } else if (key_type->is_nested()) { const auto& nested_type = internal::checked_cast(*key_type); ICEBERG_ASSIGN_OR_RAISE( projected_keys, @@ -233,7 +253,14 @@ Result> ProjectMapArray( // Project values std::shared_ptr<::arrow::Array> projected_items; - if (value_type->is_nested()) { + if (value_projection.kind == FieldProjection::Kind::kNull) { + ICEBERG_ASSIGN_OR_RAISE( + projected_items, + MakeNullArray(output_map_type->item_type(), map_array->items()->length(), pool)); + } else if (value_projection.kind != FieldProjection::Kind::kProjected) { + return NotImplemented("Unsupported map value projection kind: {}", + ToString(value_projection.kind)); + } else if (value_type->is_nested()) { const auto& nested_type = internal::checked_cast(*value_type); ICEBERG_ASSIGN_OR_RAISE( projected_items, diff --git a/src/iceberg/parquet/parquet_schema_util.cc b/src/iceberg/parquet/parquet_schema_util.cc index 849bbd1f8..39e321d9f 100644 --- a/src/iceberg/parquet/parquet_schema_util.cc +++ b/src/iceberg/parquet/parquet_schema_util.cc @@ -59,10 +59,61 @@ std::optional GetFieldId(const ::parquet::arrow::SchemaField& parquet_f return FieldIdFromMetadata(parquet_field.field->metadata()); } -// TODO(gangwu): support v3 unknown type +bool IsNullPhysicalField(const ::parquet::arrow::SchemaField& parquet_field) { + return parquet_field.field->type()->id() == ::arrow::Type::NA; +} + +bool HasSelectedColumn(const FieldProjection& projection) { + if (projection.attributes) { + const auto& attributes = + internal::checked_cast(*projection.attributes); + if (attributes.column_id) { + return true; + } + } + return std::ranges::any_of(projection.children, HasSelectedColumn); +} + +std::optional FirstColumnIndex( + const ::parquet::arrow::SchemaField& parquet_field) { + if (parquet_field.column_index >= 0) { + return parquet_field.column_index; + } + for (const auto& child : parquet_field.children) { + if (auto column_index = FirstColumnIndex(child)) { + return column_index; + } + } + return std::nullopt; +} + +// Pick a physical column as an anchor when all children are null-projected, +// so that Parquet readers can still locate row boundaries. +void SelectAnchorColumnIfEmpty( + FieldProjection* projection, + const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { + if (HasSelectedColumn(*projection)) { + return; + } + for (const auto& parquet_field : parquet_fields) { + if (auto column_index = FirstColumnIndex(parquet_field)) { + projection->attributes = + std::make_shared(column_index.value()); + return; + } + } +} + +} // namespace + Status ValidateParquetSchemaEvolution( const Type& expected_type, const ::parquet::arrow::SchemaField& parquet_field) { const auto& arrow_type = parquet_field.field->type(); + // Some Parquet files may contain null-only physical fields. Allow reading them as + // any optional projected field type. + if (arrow_type->id() == ::arrow::Type::NA) { + return {}; + } switch (expected_type.type_id()) { case TypeId::kBoolean: if (arrow_type->id() == ::arrow::Type::BOOL) { @@ -186,6 +237,8 @@ Status ValidateParquetSchemaEvolution( } } break; + case TypeId::kUnknown: + return {}; case TypeId::kStruct: if (arrow_type->id() == ::arrow::Type::STRUCT) { return {}; @@ -209,11 +262,42 @@ Status ValidateParquetSchemaEvolution( expected_type, arrow_type->ToString()); } +namespace { + // Forward declaration Result ProjectNested( const Type& nested_type, const std::vector<::parquet::arrow::SchemaField>& parquet_fields); +Result ProjectField(const SchemaField& expected_field, + const ::parquet::arrow::SchemaField& parquet_field, + size_t source_index) { + const Type& expected_type = *expected_field.type(); + + FieldProjection projection; + if (expected_type.type_id() == TypeId::kUnknown || IsNullPhysicalField(parquet_field)) { + if (!expected_field.optional()) { + return InvalidSchema("Cannot project required field with id {} as null", + expected_field.field_id()); + } + projection.kind = FieldProjection::Kind::kNull; + return projection; + } + + ICEBERG_RETURN_UNEXPECTED(ValidateParquetSchemaEvolution(expected_type, parquet_field)); + + if (expected_type.is_nested()) { + ICEBERG_ASSIGN_OR_RAISE(projection, + ProjectNested(expected_type, parquet_field.children)); + } else { + projection.attributes = + std::make_shared(parquet_field.column_index); + } + projection.from = source_index; + projection.kind = FieldProjection::Kind::kProjected; + return projection; +} + Result ProjectStruct( const StructType& struct_type, const std::vector<::parquet::arrow::SchemaField>& parquet_fields) { @@ -248,17 +332,8 @@ Result ProjectStruct( if (auto iter = field_context_map.find(field_id); iter != field_context_map.cend()) { const auto& parquet_field = iter->second.parquet_field; - ICEBERG_RETURN_UNEXPECTED( - ValidateParquetSchemaEvolution(*field.type(), parquet_field)); - if (field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE(child_projection, - ProjectNested(*field.type(), parquet_field.children)); - } else { - child_projection.attributes = - std::make_shared(parquet_field.column_index); - } - child_projection.from = iter->second.local_index; - child_projection.kind = FieldProjection::Kind::kProjected; + ICEBERG_ASSIGN_OR_RAISE( + child_projection, ProjectField(field, parquet_field, iter->second.local_index)); } else if (MetadataColumns::IsMetadataColumn(field_id)) { child_projection.kind = FieldProjection::Kind::kMetadata; } else if (field.optional()) { @@ -270,6 +345,7 @@ Result ProjectStruct( result.children.emplace_back(std::move(child_projection)); } + SelectAnchorColumnIfEmpty(&result, parquet_fields); PruneFieldProjection(result); return result; } @@ -294,23 +370,12 @@ Result ProjectList( element_field.field_id(), element_field_id.value()); } - ICEBERG_RETURN_UNEXPECTED( - ValidateParquetSchemaEvolution(*element_field.type(), parquet_field)); - - FieldProjection element_projection; - if (element_field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE(element_projection, - ProjectNested(*element_field.type(), parquet_field.children)); - } else { - element_projection.attributes = - std::make_shared(parquet_field.column_index); - } - - element_projection.kind = FieldProjection::Kind::kProjected; - element_projection.from = size_t{0}; + ICEBERG_ASSIGN_OR_RAISE(auto element_projection, + ProjectField(element_field, parquet_field, size_t{0})); FieldProjection result; result.children.emplace_back(std::move(element_projection)); + SelectAnchorColumnIfEmpty(&result, parquet_fields); return result; } @@ -346,23 +411,20 @@ Result ProjectMap( result.children.reserve(2); for (size_t i = 0; i < parquet_fields.size(); ++i) { - FieldProjection sub_projection; const auto& sub_node = parquet_fields[i]; const auto& sub_field = map_type.fields()[i]; - ICEBERG_RETURN_UNEXPECTED( - ValidateParquetSchemaEvolution(*sub_field.type(), sub_node)); - if (sub_field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE(sub_projection, - ProjectNested(*sub_field.type(), sub_node.children)); - } else { - sub_projection.attributes = - std::make_shared(sub_node.column_index); + ICEBERG_ASSIGN_OR_RAISE(auto sub_projection, ProjectField(sub_field, sub_node, i)); + if (sub_projection.kind == FieldProjection::Kind::kNull && + !HasSelectedColumn(sub_projection)) { + if (auto column_index = FirstColumnIndex(sub_node)) { + sub_projection.attributes = + std::make_shared(column_index.value()); + } } - sub_projection.kind = FieldProjection::Kind::kProjected; - sub_projection.from = i; result.children.emplace_back(std::move(sub_projection)); } + SelectAnchorColumnIfEmpty(&result, parquet_fields); return result; } diff --git a/src/iceberg/parquet/parquet_schema_util_internal.h b/src/iceberg/parquet/parquet_schema_util_internal.h index 8e06b0bcf..567069291 100644 --- a/src/iceberg/parquet/parquet_schema_util_internal.h +++ b/src/iceberg/parquet/parquet_schema_util_internal.h @@ -62,4 +62,8 @@ std::vector SelectedColumnIndices(const SchemaProjection& projection); /// \return True if the Parquet schema has field IDs, false otherwise. bool HasFieldIds(const ::parquet::schema::NodePtr& root_node); +/// \brief Validate whether a projected Iceberg type is compatible with a Parquet field. +Status ValidateParquetSchemaEvolution(const Type& expected_type, + const ::parquet::arrow::SchemaField& parquet_field); + } // namespace iceberg::parquet diff --git a/src/iceberg/row/arrow_array_wrapper.cc b/src/iceberg/row/arrow_array_wrapper.cc index e97293bcd..0d6d48c15 100644 --- a/src/iceberg/row/arrow_array_wrapper.cc +++ b/src/iceberg/row/arrow_array_wrapper.cc @@ -44,6 +44,8 @@ Result ExtractValue(const ArrowSchema* schema, const ArrowArray* array, } switch (array_view->storage_type) { + case NANOARROW_TYPE_NA: + return std::monostate{}; case NANOARROW_TYPE_BOOL: return static_cast(ArrowArrayViewGetIntUnsafe(array_view, index)); case NANOARROW_TYPE_INT32: diff --git a/src/iceberg/schema.cc b/src/iceberg/schema.cc index 5fdd47998..5e60b551f 100644 --- a/src/iceberg/schema.cc +++ b/src/iceberg/schema.cc @@ -35,6 +35,44 @@ namespace iceberg { +namespace { + +Status ValidateFieldNullability(const Type& type) { + auto validate_field = [&](const SchemaField& field) -> Status { + ICEBERG_PRECHECK(field.optional() || field.type()->type_id() != TypeId::kUnknown, + "Unknown type field '{}' must be optional", field.name()); + return ValidateFieldNullability(*field.type()); + }; + + switch (type.type_id()) { + case TypeId::kStruct: { + const auto& struct_type = static_cast(type); + for (const auto& field : struct_type.fields()) { + ICEBERG_RETURN_UNEXPECTED(validate_field(field)); + } + return {}; + } + case TypeId::kList: { + const auto& list_type = static_cast(type); + const auto& element = list_type.element(); + return validate_field(element); + } + case TypeId::kMap: { + const auto& map_type = static_cast(type); + const auto& key = map_type.key(); + const auto& value = map_type.value(); + ICEBERG_PRECHECK(key.type()->type_id() != TypeId::kUnknown, + "Map 'key' cannot be unknown type"); + ICEBERG_RETURN_UNEXPECTED(ValidateFieldNullability(*key.type())); + return validate_field(value); + } + default: + return {}; + } +} + +} // namespace + Schema::Schema(std::vector fields, int32_t schema_id) : StructType(std::move(fields)), schema_id_(schema_id), @@ -282,6 +320,8 @@ bool Schema::SameSchema(const Schema& other) const { } Status Schema::Validate(int32_t format_version) const { + ICEBERG_RETURN_UNEXPECTED(ValidateFieldNullability(*this)); + // Get all fields including nested ones ICEBERG_ASSIGN_OR_RAISE(auto id_to_field, cache_->GetIdToFieldMap()); diff --git a/src/iceberg/schema_internal.cc b/src/iceberg/schema_internal.cc index bdd5b859f..c32ceb2a6 100644 --- a/src/iceberg/schema_internal.cc +++ b/src/iceberg/schema_internal.cc @@ -150,6 +150,9 @@ ArrowErrorCode ToArrowSchema(const Type& type, bool optional, std::string_view n ArrowMetadataBuilderAppend(&metadata_buffer, ArrowCharView(kArrowExtensionName), ArrowCharView(kArrowUuidExtensionName))); } break; + case TypeId::kUnknown: + NANOARROW_RETURN_NOT_OK(ArrowSchemaSetType(schema, NANOARROW_TYPE_NA)); + break; } if (!name.empty()) { @@ -217,6 +220,9 @@ Result> FromArrowSchema(const ArrowSchema& schema) { auto field_id = GetFieldId(schema); bool is_optional = (schema.flags & ARROW_FLAG_NULLABLE) != 0; + if (field_type->type_id() == TypeId::kUnknown && !is_optional) { + return InvalidSchema("Arrow null field '{}' must be nullable", schema.name); + } return std::make_unique(field_id, schema.name, std::move(field_type), is_optional); }; @@ -312,6 +318,8 @@ Result> FromArrowSchema(const ArrowSchema& schema) { } return iceberg::fixed(schema_view.fixed_size); } + case NANOARROW_TYPE_NA: + return iceberg::unknown(); default: return InvalidSchema("Unsupported Arrow type: {}", ArrowTypeString(schema_view.type)); diff --git a/src/iceberg/schema_util.cc b/src/iceberg/schema_util.cc index 4acdab631..4ff678fc6 100644 --- a/src/iceberg/schema_util.cc +++ b/src/iceberg/schema_util.cc @@ -49,6 +49,9 @@ Status ValidateSchemaEvolution(const Type& expected_type, const Type& source_typ if (expected_type == source_type) { return {}; } + if (source_type.type_id() == TypeId::kUnknown && expected_type.is_primitive()) { + return {}; + } switch (expected_type.type_id()) { case TypeId::kLong: { @@ -79,6 +82,50 @@ Status ValidateSchemaEvolution(const Type& expected_type, const Type& source_typ return NotSupported("Cannot read {} from {}", expected_type, source_type); } +Result ProjectNested(const Type& expected_type, const Type& source_type, + bool prune_source); + +Result ProjectField(const SchemaField& expected_field, + const SchemaField& source_field, size_t source_index, + bool prune_source) { + FieldProjection projection; + + if (expected_field.type()->type_id() == TypeId::kUnknown) { + if (!expected_field.optional()) { + return InvalidSchema("Cannot project required field with id {} as null", + expected_field.field_id()); + } + projection.kind = FieldProjection::Kind::kNull; + return projection; + } + + if (source_field.type()->type_id() == TypeId::kUnknown && !expected_field.optional()) { + return InvalidSchema("Cannot project required field with id {} as null", + expected_field.field_id()); + } + if (source_field.type()->type_id() == TypeId::kUnknown && + expected_field.type()->is_nested()) { + projection.kind = FieldProjection::Kind::kNull; + return projection; + } + + if (expected_field.type()->is_nested()) { + ICEBERG_ASSIGN_OR_RAISE( + projection, + ProjectNested(*expected_field.type(), *source_field.type(), prune_source)); + } else { + ICEBERG_RETURN_UNEXPECTED( + ValidateSchemaEvolution(*expected_field.type(), *source_field.type())); + } + + // If `prune_source` is false, all fields will be read so the local index is exactly + // the position to read data. Otherwise, the local index is computed by pruning all + // non-projected fields. + projection.from = source_index; + projection.kind = FieldProjection::Kind::kProjected; + return projection; +} + Result ProjectNested(const Type& expected_type, const Type& source_type, bool prune_source) { if (!expected_type.is_nested()) { @@ -120,19 +167,9 @@ Result ProjectNested(const Type& expected_type, const Type& sou FieldProjection child_projection; if (auto iter = source_field_map.find(field_id); iter != source_field_map.cend()) { - if (expected_field.type()->is_nested()) { - ICEBERG_ASSIGN_OR_RAISE(child_projection, - ProjectNested(*expected_field.type(), - *iter->second.field->type(), prune_source)); - } else { - ICEBERG_RETURN_UNEXPECTED( - ValidateSchemaEvolution(*expected_field.type(), *iter->second.field->type())); - } - // If `prune_source` is false, all fields will be read so the local index - // is exactly the position to read data. Otherwise, the local index is computed - // by pruning all non-projected fields - child_projection.from = iter->second.local_index; - child_projection.kind = FieldProjection::Kind::kProjected; + ICEBERG_ASSIGN_OR_RAISE(child_projection, + ProjectField(expected_field, *iter->second.field, + iter->second.local_index, prune_source)); } else if (MetadataColumns::IsMetadataColumn(field_id)) { child_projection.kind = FieldProjection::Kind::kMetadata; } else if (expected_field.optional()) { diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h index 2f0c7e181..335fedadc 100644 --- a/src/iceberg/table_metadata.h +++ b/src/iceberg/table_metadata.h @@ -80,6 +80,7 @@ struct ICEBERG_EXPORT TableMetadata { static inline const std::unordered_map kMinFormatVersions = { {TypeId::kTimestampNs, 3}, {TypeId::kTimestampTzNs, 3}, + {TypeId::kUnknown, 3}, }; /// An integer version number for the format diff --git a/src/iceberg/test/arrow_test.cc b/src/iceberg/test/arrow_test.cc index dcfdb6b56..2e2a80096 100644 --- a/src/iceberg/test/arrow_test.cc +++ b/src/iceberg/test/arrow_test.cc @@ -107,7 +107,9 @@ INSTANTIATE_TEST_SUITE_P( ToArrowSchemaParam{.iceberg_type = iceberg::uuid(), .arrow_type = ::arrow::extension::uuid()}, ToArrowSchemaParam{.iceberg_type = iceberg::fixed(20), - .arrow_type = ::arrow::fixed_size_binary(20)})); + .arrow_type = ::arrow::fixed_size_binary(20)}, + ToArrowSchemaParam{.iceberg_type = iceberg::unknown(), + .arrow_type = ::arrow::null()})); namespace { @@ -233,6 +235,81 @@ TEST(ToArrowSchemaTest, MapType) { /*nullable=*/true, kValueFieldId)); } +TEST(ToArrowSchemaTest, NestedUnknownFieldsRoundTrip) { + Schema schema( + { + SchemaField::MakeOptional( + /*field_id=*/1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/2, "mystery", + iceberg::unknown()), + })), + SchemaField::MakeOptional( + /*field_id=*/3, "mysteries", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/4, "element", iceberg::unknown()))), + SchemaField::MakeOptional( + /*field_id=*/5, "properties", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/6, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/7, "value", + iceberg::unknown()))), + }, + /*schema_id=*/0); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(schema, &arrow_c_schema), IsOk()); + + auto imported_schema = ::arrow::ImportSchema(&arrow_c_schema).ValueOrDie(); + ASSERT_EQ(imported_schema->num_fields(), 3); + + auto profile_type = + std::static_pointer_cast<::arrow::StructType>(imported_schema->field(0)->type()); + ASSERT_EQ(profile_type->num_fields(), 1); + ASSERT_NO_FATAL_FAILURE(CheckArrowField(*profile_type->field(0), ::arrow::Type::NA, + "mystery", /*nullable=*/true, + /*field_id=*/2)); + + auto mysteries_type = + std::static_pointer_cast<::arrow::ListType>(imported_schema->field(1)->type()); + ASSERT_NO_FATAL_FAILURE(CheckArrowField(*mysteries_type->value_field(), + ::arrow::Type::NA, "element", + /*nullable=*/true, /*field_id=*/4)); + + auto properties_type = + std::static_pointer_cast<::arrow::MapType>(imported_schema->field(2)->type()); + ASSERT_NO_FATAL_FAILURE(CheckArrowField(*properties_type->key_field(), + ::arrow::Type::STRING, "key", + /*nullable=*/false, /*field_id=*/6)); + ASSERT_NO_FATAL_FAILURE(CheckArrowField(*properties_type->item_field(), + ::arrow::Type::NA, "value", + /*nullable=*/true, /*field_id=*/7)); + + ArrowSchema exported_schema; + ASSERT_TRUE(::arrow::ExportSchema(*imported_schema, &exported_schema).ok()); + auto schema_result = FromArrowSchema(exported_schema, /*schema_id=*/0); + ASSERT_THAT(schema_result, IsOk()); + ArrowSchemaRelease(&exported_schema); + + const auto& round_tripped_schema = *schema_result.value(); + ASSERT_EQ(round_tripped_schema.fields().size(), 3); + + const auto* profile = + dynamic_cast(round_tripped_schema.fields()[0].type().get()); + ASSERT_NE(profile, nullptr); + ASSERT_EQ(profile->fields()[0].type()->type_id(), TypeId::kUnknown); + + const auto* mysteries = + dynamic_cast(round_tripped_schema.fields()[1].type().get()); + ASSERT_NE(mysteries, nullptr); + ASSERT_EQ(mysteries->fields()[0].type()->type_id(), TypeId::kUnknown); + + const auto* properties = + dynamic_cast(round_tripped_schema.fields()[2].type().get()); + ASSERT_NE(properties, nullptr); + ASSERT_EQ(properties->value().type()->type_id(), TypeId::kUnknown); +} + struct FromArrowSchemaParam { std::shared_ptr arrow_type; bool optional = true; @@ -307,7 +384,51 @@ INSTANTIATE_TEST_SUITE_P( FromArrowSchemaParam{.arrow_type = ::arrow::extension::uuid(), .iceberg_type = iceberg::uuid()}, FromArrowSchemaParam{.arrow_type = ::arrow::fixed_size_binary(20), - .iceberg_type = iceberg::fixed(20)})); + .iceberg_type = iceberg::fixed(20)}, + FromArrowSchemaParam{.arrow_type = ::arrow::null(), + .iceberg_type = iceberg::unknown()})); + +TEST(FromArrowSchemaTest, RejectRequiredNullFieldAsUnknown) { + auto metadata = + ::arrow::key_value_metadata(std::unordered_map{ + {std::string(kParquetFieldIdKey), "1"}}); + auto arrow_schema = ::arrow::schema({::arrow::field( + "mystery", ::arrow::null(), /*nullable=*/false, std::move(metadata))}); + + ArrowSchema exported_schema; + ASSERT_TRUE(::arrow::ExportSchema(*arrow_schema, &exported_schema).ok()); + + auto schema_result = FromArrowSchema(exported_schema, /*schema_id=*/0); + ArrowSchemaRelease(&exported_schema); + + ASSERT_THAT(schema_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(schema_result, + HasErrorMessage("Arrow null field 'mystery' must be nullable")); +} + +TEST(FromArrowSchemaTest, RejectRequiredNullListElementAsUnknown) { + auto list_metadata = + ::arrow::key_value_metadata(std::unordered_map{ + {std::string(kParquetFieldIdKey), "1"}}); + auto element_metadata = + ::arrow::key_value_metadata(std::unordered_map{ + {std::string(kParquetFieldIdKey), "2"}}); + auto element_field = ::arrow::field("element", ::arrow::null(), /*nullable=*/false, + std::move(element_metadata)); + auto arrow_schema = + ::arrow::schema({::arrow::field("mysteries", ::arrow::list(element_field), + /*nullable=*/true, std::move(list_metadata))}); + + ArrowSchema exported_schema; + ASSERT_TRUE(::arrow::ExportSchema(*arrow_schema, &exported_schema).ok()); + + auto schema_result = FromArrowSchema(exported_schema, /*schema_id=*/0); + ArrowSchemaRelease(&exported_schema); + + ASSERT_THAT(schema_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(schema_result, + HasErrorMessage("Arrow null field 'element' must be nullable")); +} TEST(FromArrowSchemaTest, StructType) { constexpr int32_t kStructFieldId = 1; diff --git a/src/iceberg/test/avro_data_test.cc b/src/iceberg/test/avro_data_test.cc index 2979ad9bd..7731f58d3 100644 --- a/src/iceberg/test/avro_data_test.cc +++ b/src/iceberg/test/avro_data_test.cc @@ -1241,6 +1241,27 @@ TEST(ExtractDatumFromArrayTest, NullHandling) { EXPECT_EQ(record2.fieldAt(0).type(), ::avro::AVRO_NULL); } +TEST(ExtractDatumFromArrayTest, UnknownType) { + Schema iceberg_schema({SchemaField::MakeOptional(1, "a", unknown())}); + ::avro::NodePtr avro_node; + ASSERT_THAT(ToAvroNodeVisitor{}.Visit(iceberg_schema, &avro_node), IsOk()); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(iceberg_schema, &arrow_c_schema), IsOk()); + auto arrow_schema = ::arrow::ImportSchema(&arrow_c_schema).ValueOrDie(); + auto arrow_struct_type = std::make_shared<::arrow::StructType>(arrow_schema->fields()); + + auto arrow_array = + ::arrow::json::ArrayFromJSONString(arrow_struct_type, R"([{"a": null}])") + .ValueOrDie(); + + ::avro::GenericDatum datum(avro_node); + ASSERT_THAT(ExtractDatumFromArray(*arrow_array, 0, &datum), IsOk()); + + const auto& record = datum.value<::avro::GenericRecord>(); + EXPECT_EQ(record.fieldAt(0).type(), ::avro::AVRO_NULL); +} + struct RoundTripParam { std::string name; std::shared_ptr iceberg_schema; diff --git a/src/iceberg/test/avro_schema_test.cc b/src/iceberg/test/avro_schema_test.cc index dc2cb0a51..ffb668abc 100644 --- a/src/iceberg/test/avro_schema_test.cc +++ b/src/iceberg/test/avro_schema_test.cc @@ -250,6 +250,12 @@ TEST(ToAvroNodeVisitorTest, BinaryType) { EXPECT_EQ(node->type(), ::avro::AVRO_BYTES); } +TEST(ToAvroNodeVisitorTest, UnknownType) { + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(UnknownType{}, &node), IsOk()); + EXPECT_EQ(node->type(), ::avro::AVRO_NULL); +} + TEST(ToAvroNodeVisitorTest, StructType) { StructType struct_type{{SchemaField{/*field_id=*/1, "bool_field", iceberg::boolean(), /*optional=*/false}, @@ -276,6 +282,70 @@ TEST(ToAvroNodeVisitorTest, StructType) { EXPECT_EQ(node->leafAt(1)->leafAt(1)->type(), ::avro::AVRO_INT); } +TEST(ToAvroNodeVisitorTest, OptionalUnknownField) { + StructType struct_type{{SchemaField{/*field_id=*/1, "mystery", iceberg::unknown(), + /*optional=*/true}}}; + + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(struct_type, &node), IsOk()); + + ASSERT_EQ(node->leaves(), 1); + EXPECT_EQ(node->leafAt(0)->type(), ::avro::AVRO_NULL); + ASSERT_EQ(node->customAttributes(), 1); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(node, /*index=*/0, /*field_id=*/1)); +} + +TEST(ToAvroNodeVisitorTest, NestedUnknownFields) { + StructType struct_type{ + {SchemaField::MakeOptional( + /*field_id=*/1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/2, "mystery", iceberg::unknown()), + })), + SchemaField::MakeOptional( + /*field_id=*/3, "mysteries", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/4, "element", iceberg::unknown()))), + SchemaField::MakeOptional( + /*field_id=*/5, "properties", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/6, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/7, "value", iceberg::unknown())))}}; + + ::avro::NodePtr node; + EXPECT_THAT(ToAvroNodeVisitor{}.Visit(struct_type, &node), IsOk()); + + ASSERT_EQ(node->leaves(), 3); + auto profile_union = node->leafAt(0); + ASSERT_EQ(profile_union->type(), ::avro::AVRO_UNION); + auto profile_node = profile_union->leafAt(1); + ASSERT_EQ(profile_node->type(), ::avro::AVRO_RECORD); + ASSERT_EQ(profile_node->leaves(), 1); + EXPECT_EQ(profile_node->leafAt(0)->type(), ::avro::AVRO_NULL); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(profile_node, /*index=*/0, /*field_id=*/2)); + + auto list_union = node->leafAt(1); + ASSERT_EQ(list_union->type(), ::avro::AVRO_UNION); + auto list_node = list_union->leafAt(1); + ASSERT_EQ(list_node->type(), ::avro::AVRO_ARRAY); + ASSERT_EQ(list_node->leaves(), 1); + EXPECT_EQ(list_node->leafAt(0)->type(), ::avro::AVRO_NULL); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(list_node, /*index=*/0, /*field_id=*/4, + /*key=*/"element-id")); + + auto map_union = node->leafAt(2); + ASSERT_EQ(map_union->type(), ::avro::AVRO_UNION); + auto map_node = map_union->leafAt(1); + ASSERT_EQ(map_node->type(), ::avro::AVRO_MAP); + ASSERT_EQ(map_node->leaves(), 2); + EXPECT_EQ(map_node->leafAt(0)->type(), ::avro::AVRO_STRING); + EXPECT_EQ(map_node->leafAt(1)->type(), ::avro::AVRO_NULL); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(map_node, /*index=*/0, /*field_id=*/6, + /*key=*/"key-id")); + ASSERT_NO_FATAL_FAILURE(CheckFieldIdAt(map_node, /*index=*/0, /*field_id=*/7, + /*key=*/"value-id")); +} + TEST(ToAvroNodeVisitorTest, StructTypeWithFieldNames) { StructType struct_type{ {SchemaField{/*field_id=*/1, "user-name", iceberg::string(), @@ -480,6 +550,13 @@ TEST(HasIdVisitorTest, HasNoIds) { EXPECT_FALSE(visitor.AllHaveIds()); } +TEST(HasIdVisitorTest, NullType) { + HasIdVisitor visitor; + EXPECT_THAT(visitor.Visit(::avro::compileJsonSchemaFromString("\"null\"")), IsOk()); + EXPECT_TRUE(visitor.HasNoIds()); + EXPECT_FALSE(visitor.AllHaveIds()); +} + TEST(HasIdVisitorTest, RecordWithFieldIds) { const std::string schema_json = R"({ "type": "record", @@ -899,6 +976,146 @@ TEST(AvroSchemaProjectionTest, ProjectSchemaEvolutionFloatToDouble) { ASSERT_EQ(std::get<1>(projection.fields[0].from), 0); } +TEST(AvroSchemaProjectionTest, ProjectUnknownExpectedFieldAsNull) { + Schema expected_schema({ + SchemaField::MakeOptional(/*field_id=*/1, "mystery", iceberg::unknown()), + }); + + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "mystery", "type": "int", "field-id": 1} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kNull); +} + +TEST(AvroSchemaProjectionTest, ProjectNestedUnknownExpectedFieldsAsNull) { + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/2, "name", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/3, "mystery", iceberg::unknown()), + })), + SchemaField::MakeOptional( + /*field_id=*/4, "mysteries", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/5, "element", iceberg::unknown()))), + SchemaField::MakeOptional( + /*field_id=*/6, "properties", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/7, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/8, "value", iceberg::unknown()))), + }); + + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "profile", "type": ["null", { + "type": "record", + "name": "profile_record", + "fields": [ + {"name": "name", "type": ["null", "string"], "field-id": 2}, + {"name": "mystery", "type": ["null", "int"], "field-id": 3} + ] + }], "field-id": 1}, + {"name": "mysteries", "type": ["null", { + "type": "array", + "items": ["null", "int"], + "element-id": 5 + }], "field-id": 4}, + {"name": "properties", "type": ["null", { + "type": "map", + "values": ["null", "int"], + "key-id": 7, + "value-id": 8 + }], "field-id": 6} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 3); + + ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(projection.fields[0].children.size(), 2); + ASSERT_EQ(projection.fields[0].children[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(projection.fields[0].children[1].kind, FieldProjection::Kind::kNull); + + ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(projection.fields[1].children.size(), 1); + ASSERT_EQ(projection.fields[1].children[0].kind, FieldProjection::Kind::kNull); + + ASSERT_EQ(projection.fields[2].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(projection.fields[2].children.size(), 2); + ASSERT_EQ(projection.fields[2].children[0].kind, FieldProjection::Kind::kProjected); + ASSERT_EQ(projection.fields[2].children[1].kind, FieldProjection::Kind::kNull); +} + +TEST(AvroSchemaProjectionTest, RejectNullLeafForRequiredField) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "value", iceberg::int32()), + }); + + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "value", "type": "null", "field-id": 1} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, + HasErrorMessage("Cannot project required field with ID: 1 as null")); +} + +TEST(AvroSchemaProjectionTest, RejectNullListElementForRequiredElement) { + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/1, "numbers", + std::make_shared(SchemaField::MakeRequired( + /*field_id=*/101, "element", iceberg::int32()))), + }); + + std::string avro_schema_json = R"({ + "type": "record", + "name": "iceberg_schema", + "fields": [ + {"name": "numbers", "type": ["null", { + "type": "array", + "items": "null", + "element-id": 101 + }], "field-id": 1} + ] + })"; + auto avro_schema = ::avro::compileJsonSchemaFromString(avro_schema_json); + + auto projection_result = + Project(expected_schema, avro_schema.root(), /*prune_source=*/false); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, + HasErrorMessage("Cannot project required field with ID: 101 as null")); +} + TEST(AvroSchemaProjectionTest, ProjectSchemaEvolutionIncompatibleTypes) { // Create iceberg schema expecting an int Schema expected_schema({ diff --git a/src/iceberg/test/avro_test.cc b/src/iceberg/test/avro_test.cc index b74fe829b..ef86ef9e2 100644 --- a/src/iceberg/test/avro_test.cc +++ b/src/iceberg/test/avro_test.cc @@ -17,7 +17,10 @@ * under the License. */ +#include #include +#include +#include #include #include @@ -30,6 +33,7 @@ #include #include "iceberg/arrow/arrow_io_internal.h" +#include "iceberg/avro/avro_constants.h" #include "iceberg/avro/avro_register.h" #include "iceberg/avro/avro_stream_internal.h" #include "iceberg/avro/avro_writer.h" @@ -45,6 +49,35 @@ namespace iceberg::avro { +namespace { + +::avro::NodePtr UnwrapOptional(const ::avro::NodePtr& node) { + if (node->type() != ::avro::AVRO_UNION) { + return node; + } + + for (size_t i = 0; i < node->leaves(); ++i) { + if (node->leafAt(i)->type() != ::avro::AVRO_NULL) { + return node->leafAt(i); + } + } + return node; +} + +std::optional FieldIdAt(const ::avro::NodePtr& node, size_t index) { + if (index >= node->customAttributes()) { + return std::nullopt; + } + + auto field_id = node->customAttributesAt(index).getAttribute(std::string(kFieldIdProp)); + if (!field_id.has_value()) { + return std::nullopt; + } + return std::stoi(field_id.value()); +} + +} // namespace + class AvroReaderTest : public TempFileTestBase { protected: static void SetUpTestSuite() { RegisterAll(); } @@ -740,6 +773,14 @@ class AvroWriterTest : public ::testing::Test, ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); } + ::avro::ValidSchema PhysicalAvroSchema() { + auto& mock_io = internal::checked_cast(*file_io_); + auto input = mock_io.fs()->OpenInputFile(temp_avro_file_).ValueOrDie(); + auto input_stream = std::make_unique(std::move(input), 1024 * 1024); + ::avro::DataFileReader<::avro::GenericDatum> avro_reader(std::move(input_stream)); + return avro_reader.dataSchema(); + } + std::shared_ptr file_io_; std::string temp_avro_file_; bool skip_datum_{true}; @@ -890,6 +931,148 @@ TEST_P(AvroWriterTest, WriteOptionalFields) { VerifyWrittenData(test_data); } +TEST_P(AvroWriterTest, WritesUnknownFieldsAsAvroNull) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "id", int32()), + SchemaField::MakeOptional(2, "mystery", unknown()), + SchemaField::MakeOptional(3, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(4, "name", string()), + SchemaField::MakeOptional(5, "secret", unknown()), + })), + }); + + std::string test_data = R"([ + [1, null, {"name": "Person0", "secret": null}], + [2, null, {"name": "Person1", "secret": null}] + ])"; + + WriteAvroFile(schema, test_data); + + auto avro_schema = PhysicalAvroSchema(); + auto root = avro_schema.root(); + ASSERT_EQ(root->type(), ::avro::AVRO_RECORD); + // Unknown fields are written as AVRO_NULL, not pruned. + ASSERT_EQ(root->leaves(), 3); + EXPECT_EQ(root->nameAt(0), "id"); + EXPECT_EQ(FieldIdAt(root, 0), std::make_optional(1)); + EXPECT_EQ(root->nameAt(1), "mystery"); + EXPECT_EQ(root->leafAt(1)->type(), ::avro::AVRO_NULL); + EXPECT_EQ(FieldIdAt(root, 1), std::make_optional(2)); + EXPECT_EQ(root->nameAt(2), "profile"); + EXPECT_EQ(FieldIdAt(root, 2), std::make_optional(3)); + + auto profile = UnwrapOptional(root->leafAt(2)); + ASSERT_EQ(profile->type(), ::avro::AVRO_RECORD); + ASSERT_EQ(profile->leaves(), 2); + EXPECT_EQ(profile->nameAt(0), "name"); + EXPECT_EQ(FieldIdAt(profile, 0), std::make_optional(4)); + EXPECT_EQ(profile->nameAt(1), "secret"); + EXPECT_EQ(profile->leafAt(1)->type(), ::avro::AVRO_NULL); + EXPECT_EQ(FieldIdAt(profile, 1), std::make_optional(5)); + + VerifyWrittenData(test_data); +} + +TEST_P(AvroWriterTest, WritesUnknownListElementsAndMapValues) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "mysteries", + std::make_shared(SchemaField::MakeOptional( + 3, ListType::kElementName, unknown()))), + SchemaField::MakeRequired( + 4, "properties", + std::make_shared( + SchemaField::MakeRequired(5, MapType::kKeyName, string()), + SchemaField::MakeOptional(6, MapType::kValueName, unknown()))), + }); + + std::string test_data = R"([ + [1, [null, null], [["a", null], ["b", null]]], + [2, [], []], + [3, [null], [["c", null]]] + ])"; + + WriteAvroFile(schema, test_data); + + auto avro_schema = PhysicalAvroSchema(); + auto root = avro_schema.root(); + ASSERT_EQ(root->type(), ::avro::AVRO_RECORD); + ASSERT_EQ(root->leaves(), 3); + + auto mysteries = root->leafAt(1); + ASSERT_EQ(mysteries->type(), ::avro::AVRO_ARRAY); + ASSERT_EQ(mysteries->leaves(), 1); + EXPECT_EQ(mysteries->leafAt(0)->type(), ::avro::AVRO_NULL); + + auto properties = root->leafAt(2); + ASSERT_EQ(properties->type(), ::avro::AVRO_MAP); + ASSERT_EQ(properties->leaves(), 2); + EXPECT_EQ(properties->leafAt(1)->type(), ::avro::AVRO_NULL); + + VerifyWrittenData(test_data); +} + +TEST_P(AvroWriterTest, WritesUnknownFieldsNestedInsideListOrMapStructs) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "id", int32()), + SchemaField::MakeOptional(2, "events", + std::make_shared(SchemaField::MakeOptional( + 3, ListType::kElementName, + std::make_shared(std::vector{ + SchemaField::MakeOptional(4, "name", string()), + SchemaField::MakeOptional(5, "secret", unknown()), + })))), + SchemaField::MakeOptional( + 6, "properties", + std::make_shared( + SchemaField::MakeRequired(7, MapType::kKeyName, iceberg::string()), + SchemaField::MakeOptional( + 8, MapType::kValueName, + std::make_shared(std::vector{ + SchemaField::MakeOptional(9, "label", string()), + SchemaField::MakeOptional(10, "secret", unknown()), + })))), + }); + + std::string test_data = R"([ + [1, [{"name": "open", "secret": null}, {"name": "close", "secret": null}], [["a", {"label": "A", "secret": null}]]], + [2, [], []] + ])"; + + WriteAvroFile(schema, test_data); + + auto avro_schema = PhysicalAvroSchema(); + auto root = avro_schema.root(); + ASSERT_EQ(root->type(), ::avro::AVRO_RECORD); + ASSERT_EQ(root->leaves(), 3); + + auto events = UnwrapOptional(root->leafAt(1)); + ASSERT_EQ(events->type(), ::avro::AVRO_ARRAY); + auto event = UnwrapOptional(events->leafAt(0)); + ASSERT_EQ(event->type(), ::avro::AVRO_RECORD); + ASSERT_EQ(event->leaves(), 2); + EXPECT_EQ(event->nameAt(0), "name"); + EXPECT_EQ(FieldIdAt(event, 0), std::make_optional(4)); + EXPECT_EQ(event->nameAt(1), "secret"); + EXPECT_EQ(event->leafAt(1)->type(), ::avro::AVRO_NULL); + EXPECT_EQ(FieldIdAt(event, 1), std::make_optional(5)); + + auto properties = UnwrapOptional(root->leafAt(2)); + ASSERT_EQ(properties->type(), ::avro::AVRO_MAP); + ASSERT_EQ(properties->leaves(), 2); + auto value = UnwrapOptional(properties->leafAt(1)); + ASSERT_EQ(value->type(), ::avro::AVRO_RECORD); + ASSERT_EQ(value->leaves(), 2); + EXPECT_EQ(value->nameAt(0), "label"); + EXPECT_EQ(FieldIdAt(value, 0), std::make_optional(9)); + EXPECT_EQ(value->nameAt(1), "secret"); + EXPECT_EQ(value->leafAt(1)->type(), ::avro::AVRO_NULL); + EXPECT_EQ(FieldIdAt(value, 1), std::make_optional(10)); + + VerifyWrittenData(test_data); +} + TEST_P(AvroWriterTest, WriteLargeDataset) { auto schema = std::make_shared(std::vector{ SchemaField::MakeRequired(1, "id", std::make_shared()), diff --git a/src/iceberg/test/metadata_serde_test.cc b/src/iceberg/test/metadata_serde_test.cc index 0d3b5959b..2c2b2bd1e 100644 --- a/src/iceberg/test/metadata_serde_test.cc +++ b/src/iceberg/test/metadata_serde_test.cc @@ -21,7 +21,9 @@ #include #include +#include +#include "iceberg/json_serde_internal.h" #include "iceberg/partition_field.h" #include "iceberg/partition_spec.h" #include "iceberg/schema.h" @@ -404,6 +406,57 @@ TEST(MetadataSerdeTest, DeserializeUnsupportedVersion) { "Cannot read unsupported version"); } +TEST(MetadataSerdeTest, DeserializeRejectsUnknownSchemaBeforeFormatV3) { + auto v1_metadata_json = nlohmann::json::parse(R"({ + "format-version": 1, + "location": "s3://bucket/test/location", + "last-column-id": 1, + "last-updated-ms": 1602638573874, + "schema": { + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "mystery", "type": "unknown", "required": false} + ] + }, + "partition-spec": [] + })"); + + auto result = TableMetadataFromJson(v1_metadata_json); + ASSERT_THAT(result, IsError(ErrorKind::kInvalidSchema)); + EXPECT_THAT(result, HasErrorMessage( + "Invalid type for mystery: unknown is not supported until v3")); + + auto v2_metadata_json = nlohmann::json::parse(R"({ + "format-version": 2, + "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c1", + "location": "s3://bucket/test/location", + "last-sequence-number": 0, + "last-column-id": 1, + "last-updated-ms": 1602638573874, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "mystery", "type": "unknown", "required": false} + ] + } + ], + "current-schema-id": 0, + "partition-specs": [{"spec-id": 0, "fields": []}], + "default-spec-id": 0, + "last-partition-id": 999, + "sort-orders": [{"order-id": 0, "fields": []}], + "default-sort-order-id": 0 + })"); + + result = TableMetadataFromJson(v2_metadata_json); + ASSERT_THAT(result, IsError(ErrorKind::kInvalidSchema)); + EXPECT_THAT(result, HasErrorMessage( + "Invalid type for mystery: unknown is not supported until v3")); +} + TEST(MetadataSerdeTest, DeserializeV1MissingSchemaType) { ReadTableMetadataExpectError("TableMetadataV1MissingSchemaType.json", "Missing 'type'"); } diff --git a/src/iceberg/test/parquet_data_test.cc b/src/iceberg/test/parquet_data_test.cc index 9ed28114e..606ad8ca5 100644 --- a/src/iceberg/test/parquet_data_test.cc +++ b/src/iceberg/test/parquet_data_test.cc @@ -316,6 +316,50 @@ TEST(ProjectRecordBatchTest, MapStringToInt) { VerifyProjectRecordBatch(iceberg_schema, iceberg_schema, input_json, input_json)); } +TEST(ProjectRecordBatchTest, NestedUnknownFields) { + Schema projected_schema({ + SchemaField::MakeRequired(1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeRequired(2, "name", string()), + SchemaField::MakeOptional(3, "mystery", unknown()), + })), + SchemaField::MakeRequired( + 4, "mysteries", + std::make_shared(SchemaField::MakeOptional(5, "element", unknown()))), + SchemaField::MakeRequired( + 6, "properties", + std::make_shared(SchemaField::MakeRequired(7, "key", string()), + SchemaField::MakeOptional(8, "value", unknown()))), + }); + + Schema source_schema({ + SchemaField::MakeRequired(1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeRequired(2, "name", string()), + SchemaField::MakeOptional(3, "mystery", int32()), + })), + SchemaField::MakeRequired( + 4, "mysteries", + std::make_shared(SchemaField::MakeOptional(5, "element", int32()))), + SchemaField::MakeRequired( + 6, "properties", + std::make_shared(SchemaField::MakeRequired(7, "key", string()), + SchemaField::MakeOptional(8, "value", int32()))), + }); + + const std::string input_json = R"([ + {"profile": {"name": "Person0", "mystery": 10}, "mysteries": [1, 2], "properties": [["a", 100], ["b", 200]]}, + {"profile": {"name": "Person1", "mystery": null}, "mysteries": [], "properties": []} + ])"; + const std::string expected_json = R"([ + {"profile": {"name": "Person0", "mystery": null}, "mysteries": [null, null], "properties": [["a", null], ["b", null]]}, + {"profile": {"name": "Person1", "mystery": null}, "mysteries": [], "properties": []} + ])"; + + ASSERT_NO_FATAL_FAILURE(VerifyProjectRecordBatch(projected_schema, source_schema, + input_json, expected_json)); +} + TEST(ProjectRecordBatchTest, MapStringToStruct) { Schema iceberg_schema({ SchemaField::MakeRequired( diff --git a/src/iceberg/test/parquet_schema_test.cc b/src/iceberg/test/parquet_schema_test.cc index a9da3f9f7..75e99ff12 100644 --- a/src/iceberg/test/parquet_schema_test.cc +++ b/src/iceberg/test/parquet_schema_test.cc @@ -17,7 +17,12 @@ * under the License. */ +#include +#include +#include + #include +#include #include #include #include @@ -123,6 +128,30 @@ ::parquet::arrow::SchemaManifest MakeSchemaManifest( return manifest; } +::parquet::arrow::SchemaField MakeNullSchemaField(const std::string& name, int field_id) { + ::parquet::arrow::SchemaField schema_field; + schema_field.field = + ::arrow::field(name, ::arrow::null()) + ->WithMetadata(::arrow::key_value_metadata({std::string(kParquetFieldIdKey)}, + {std::to_string(field_id)})); + return schema_field; +} + +::parquet::arrow::SchemaField MakeListSchemaFieldWithNullElement(const std::string& name, + int field_id, + int element_field_id) { + ::parquet::arrow::SchemaField element_field = + MakeNullSchemaField("element", element_field_id); + + ::parquet::arrow::SchemaField schema_field; + schema_field.field = + ::arrow::field(name, ::arrow::list(element_field.field)) + ->WithMetadata(::arrow::key_value_metadata({std::string(kParquetFieldIdKey)}, + {std::to_string(field_id)})); + schema_field.children = {std::move(element_field)}; + return schema_field; +} + #define ASSERT_PROJECTED_FIELD(field_projection, index) \ ASSERT_EQ(field_projection.kind, FieldProjection::Kind::kProjected); \ ASSERT_EQ(std::get<1>(field_projection.from), index); @@ -303,6 +332,213 @@ TEST(ParquetSchemaProjectionTest, ProjectSchemaEvolutionFloatToDouble) { ASSERT_PROJECTED_FIELD(projection.fields[0], 0); } +TEST(ParquetSchemaProjectionTest, ValidateSchemaEvolutionAllowsNullPhysicalType) { + ::parquet::arrow::SchemaField parquet_field; + parquet_field.field = ::arrow::field("value", ::arrow::null()); + + auto status = ValidateParquetSchemaEvolution(*iceberg::int32(), parquet_field); + ASSERT_THAT(status, IsOk()); +} + +TEST(ParquetSchemaProjectionTest, ProjectNullPhysicalFieldsAsNull) { + Schema expected_schema({ + SchemaField::MakeOptional(/*field_id=*/1, "age", iceberg::int32()), + SchemaField::MakeOptional( + /*field_id=*/2, "profile", + std::make_shared(std::vector{ + SchemaField::MakeRequired(/*field_id=*/201, "name", iceberg::string()), + })), + SchemaField::MakeOptional( + /*field_id=*/3, "numbers", + std::make_shared(SchemaField::MakeRequired( + /*field_id=*/301, "element", iceberg::int32()))), + SchemaField::MakeOptional( + /*field_id=*/4, "counts", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/401, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/402, "value", iceberg::int32()))), + }); + + ::parquet::arrow::SchemaManifest schema_manifest; + schema_manifest.schema_fields = { + MakeNullSchemaField("age", /*field_id=*/1), + MakeNullSchemaField("profile", /*field_id=*/2), + MakeNullSchemaField("numbers", /*field_id=*/3), + MakeNullSchemaField("counts", /*field_id=*/4), + }; + + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 4); + for (const auto& field_projection : projection.fields) { + ASSERT_PROJECTED_NULL_FIELD(field_projection); + ASSERT_TRUE(field_projection.children.empty()); + } + + ASSERT_TRUE(SelectedColumnIndices(projection).empty()); +} + +TEST(ParquetSchemaProjectionTest, RejectNullPhysicalFieldForRequiredField) { + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/1, "age", iceberg::int32()), + }); + + ::parquet::arrow::SchemaManifest schema_manifest; + schema_manifest.schema_fields = { + MakeNullSchemaField("age", /*field_id=*/1), + }; + + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, + HasErrorMessage("Cannot project required field with id 1 as null")); +} + +TEST(ParquetSchemaProjectionTest, RejectNullPhysicalListElementForRequiredElement) { + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/1, "numbers", + std::make_shared(SchemaField::MakeRequired( + /*field_id=*/101, "element", iceberg::int32()))), + }); + + ::parquet::arrow::SchemaManifest schema_manifest; + schema_manifest.schema_fields = { + MakeListSchemaFieldWithNullElement("numbers", /*field_id=*/1, + /*element_field_id=*/101), + }; + + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, + HasErrorMessage("Cannot project required field with id 101 as null")); +} + +TEST(ParquetSchemaProjectionTest, ProjectUnknownExpectedFieldAsNull) { + Schema expected_schema({ + SchemaField::MakeOptional(/*field_id=*/1, "mystery", iceberg::unknown()), + }); + + auto parquet_schema = + MakeGroupNode("iceberg_schema", {MakeInt32Node("mystery", /*field_id=*/1)}); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + ASSERT_PROJECTED_NULL_FIELD(projection.fields[0]); + ASSERT_TRUE(SelectedColumnIndices(projection).empty()); +} + +TEST(ParquetSchemaProjectionTest, ProjectNullPhysicalFieldToNestedAsNull) { + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/2, "name", iceberg::string()), + })), + SchemaField::MakeOptional( + /*field_id=*/3, "items", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/4, "element", iceberg::string()))), + SchemaField::MakeOptional( + /*field_id=*/5, "properties", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/6, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/7, "value", iceberg::string()))), + }); + + ::parquet::arrow::SchemaManifest schema_manifest; + schema_manifest.schema_fields = { + MakeNullSchemaField("profile", /*field_id=*/1), + MakeNullSchemaField("items", /*field_id=*/3), + MakeNullSchemaField("properties", /*field_id=*/5), + }; + + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 3); + ASSERT_PROJECTED_NULL_FIELD(projection.fields[0]); + ASSERT_PROJECTED_NULL_FIELD(projection.fields[1]); + ASSERT_PROJECTED_NULL_FIELD(projection.fields[2]); + ASSERT_TRUE(SelectedColumnIndices(projection).empty()); +} + +TEST(ParquetSchemaProjectionTest, ProjectNestedUnknownExpectedFieldsAsNull) { + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/2, "name", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/3, "mystery", iceberg::unknown()), + })), + SchemaField::MakeOptional( + /*field_id=*/4, "mysteries", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/5, "element", iceberg::unknown()))), + SchemaField::MakeOptional( + /*field_id=*/6, "properties", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/7, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/8, "value", iceberg::unknown()))), + SchemaField::MakeOptional( + /*field_id=*/9, "wrapper", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/10, "mystery", iceberg::unknown()), + })), + }); + + auto parquet_schema = MakeGroupNode( + "iceberg_schema", + { + MakeGroupNode("profile", + {MakeStringNode("name", /*field_id=*/2), + MakeInt32Node("mystery", /*field_id=*/3)}, + /*field_id=*/1), + MakeListNode("mysteries", MakeInt32Node("element", /*field_id=*/5), + /*field_id=*/4), + MakeMapNode("properties", + MakeStringNode("key", /*field_id=*/7, /*optional=*/false), + MakeInt32Node("value", /*field_id=*/8), + /*field_id=*/6), + MakeGroupNode("wrapper", {MakeInt32Node("mystery", /*field_id=*/10)}, + /*field_id=*/9), + }); + + auto schema_manifest = MakeSchemaManifest(parquet_schema); + auto projection_result = Project(expected_schema, schema_manifest); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 4); + + ASSERT_PROJECTED_FIELD(projection.fields[0], 0); + ASSERT_EQ(projection.fields[0].children.size(), 2); + ASSERT_PROJECTED_FIELD(projection.fields[0].children[0], 0); + ASSERT_PROJECTED_NULL_FIELD(projection.fields[0].children[1]); + + ASSERT_PROJECTED_FIELD(projection.fields[1], 1); + ASSERT_EQ(projection.fields[1].children.size(), 1); + ASSERT_PROJECTED_NULL_FIELD(projection.fields[1].children[0]); + + ASSERT_PROJECTED_FIELD(projection.fields[2], 2); + ASSERT_EQ(projection.fields[2].children.size(), 2); + ASSERT_PROJECTED_FIELD(projection.fields[2].children[0], 0); + ASSERT_PROJECTED_NULL_FIELD(projection.fields[2].children[1]); + + ASSERT_PROJECTED_FIELD(projection.fields[3], 3); + ASSERT_EQ(projection.fields[3].children.size(), 1); + ASSERT_PROJECTED_NULL_FIELD(projection.fields[3].children[0]); + + ASSERT_EQ(SelectedColumnIndices(projection), std::vector({0, 2, 3, 4, 5})); +} + TEST(ParquetSchemaProjectionTest, ProjectSchemaEvolutionIncompatibleTypes) { Schema expected_schema({ SchemaField::MakeRequired(/*field_id=*/1, "value", iceberg::int32()), diff --git a/src/iceberg/test/parquet_test.cc b/src/iceberg/test/parquet_test.cc index 70fb9880f..ee1cbc931 100644 --- a/src/iceberg/test/parquet_test.cc +++ b/src/iceberg/test/parquet_test.cc @@ -33,6 +33,7 @@ #include #include #include +#include #include #include "iceberg/arrow/arrow_io_internal.h" @@ -461,6 +462,78 @@ TEST_F(ParquetReaderTest, ReadMetadataOnlyProjection) { ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, kExpectedJson)); } +TEST_F(ParquetReaderTest, ReadNestedUnknownProjection) { + temp_parquet_file_ = "nested_unknown.parquet"; + auto write_schema = std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(2, "name", string()), + SchemaField::MakeOptional(3, "mystery", int32()), + })), + SchemaField::MakeOptional( + 4, "mysteries", + std::make_shared(SchemaField::MakeOptional(5, "element", int32()))), + SchemaField::MakeOptional( + 6, "properties", + std::make_shared(SchemaField::MakeRequired(7, "key", string()), + SchemaField::MakeOptional(8, "value", int32()))), + SchemaField::MakeOptional(9, "wrapper", + std::make_shared(std::vector{ + SchemaField::MakeOptional(10, "mystery", int32()), + })), + }); + auto read_schema = std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(2, "name", string()), + SchemaField::MakeOptional(3, "mystery", unknown()), + })), + SchemaField::MakeOptional( + 4, "mysteries", + std::make_shared(SchemaField::MakeOptional(5, "element", unknown()))), + SchemaField::MakeOptional( + 6, "properties", + std::make_shared(SchemaField::MakeRequired(7, "key", string()), + SchemaField::MakeOptional(8, "value", unknown()))), + SchemaField::MakeOptional(9, "wrapper", + std::make_shared(std::vector{ + SchemaField::MakeOptional(10, "mystery", unknown()), + })), + }); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(*write_schema, &arrow_c_schema), IsOk()); + auto arrow_type = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + auto array = ::arrow::json::ArrayFromJSONString(arrow_type, + R"([ + {"profile": {"name": "Person0", "mystery": 10}, "mysteries": [1, 2], "properties": [["a", 100], ["b", 200]], "wrapper": {"mystery": 300}}, + {"profile": {"name": "Person1", "mystery": null}, "mysteries": [], "properties": [], "wrapper": {"mystery": null}} + ])") + .ValueOrDie(); + + WriterProperties writer_properties; + writer_properties.Set(WriterProperties::kParquetCompression, + std::string("uncompressed")); + ASSERT_THAT(WriteArray(array, {.path = temp_parquet_file_, + .schema = write_schema, + .io = file_io_, + .properties = std::move(writer_properties)}), + IsOk()); + + ICEBERG_UNWRAP_OR_FAIL( + auto reader, + ReaderFactoryRegistry::Open( + FileFormatType::kParquet, + {.path = temp_parquet_file_, .io = file_io_, .projection = read_schema})); + + ASSERT_NO_FATAL_FAILURE(VerifyNextBatch(*reader, + R"([ + {"profile": {"name": "Person0", "mystery": null}, "mysteries": [null, null], "properties": [["a", null], ["b", null]], "wrapper": {"mystery": null}}, + {"profile": {"name": "Person1", "mystery": null}, "mysteries": [], "properties": [], "wrapper": {"mystery": null}} + ])")); + ASSERT_NO_FATAL_FAILURE(VerifyExhausted(*reader)); +} + class ParquetReadWrite : public ::testing::Test { protected: static void SetUpTestSuite() { parquet::RegisterAll(); } @@ -509,6 +582,142 @@ TEST_F(ParquetReadWrite, RejectsUnavailableCompressionCodec) { " is not available in the current build")); } +TEST_F(ParquetReadWrite, WritesUnknownFieldsNestedInsideListOrMapStructs) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "id", int32()), + SchemaField::MakeOptional(2, "events", + std::make_shared(SchemaField::MakeOptional( + 3, ListType::kElementName, + std::make_shared(std::vector{ + SchemaField::MakeOptional(4, "name", string()), + SchemaField::MakeOptional(5, "secret", unknown()), + })))), + SchemaField::MakeOptional( + 6, "properties", + std::make_shared( + SchemaField::MakeRequired(7, MapType::kKeyName, iceberg::string()), + SchemaField::MakeOptional( + 8, MapType::kValueName, + std::make_shared(std::vector{ + SchemaField::MakeOptional(9, "label", string()), + SchemaField::MakeOptional(10, "secret", unknown()), + })))), + }); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); + auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + + auto array = + ::arrow::json::ArrayFromJSONString(::arrow::struct_(arrow_schema->fields()), + R"([ + {"id": 1, "events": [{"name": "open", "secret": null}, {"name": "close", "secret": null}], "properties": [["a", {"label": "A", "secret": null}]]}, + {"id": 2, "events": [], "properties": []} + ])") + .ValueOrDie(); + + std::shared_ptr file_io = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); + const std::string basePath = "nested_unknown_fields.parquet"; + WriterProperties writer_properties; + writer_properties.Set(WriterProperties::kParquetCompression, + std::string("uncompressed")); + ASSERT_THAT(WriteArray(array, {.path = basePath, + .schema = schema, + .io = file_io, + .properties = std::move(writer_properties)}), + IsOk()); + + auto& arrow_file_io = internal::checked_cast(*file_io); + auto input_file = arrow_file_io.fs()->OpenInputFile(basePath).ValueOrDie(); + auto parquet_reader = ::parquet::ParquetFileReader::Open(input_file); + auto parquet_schema = parquet_reader->metadata()->schema(); + + std::vector field_ids; + for (int i = 0; i < parquet_schema->num_columns(); ++i) { + field_ids.push_back(parquet_schema->Column(i)->schema_node()->field_id()); + } + // Unknown fields (secret, IDs 5 and 10) are also written as null-type columns. + EXPECT_THAT(field_ids, ::testing::UnorderedElementsAre(1, 4, 5, 7, 9, 10)); + + std::shared_ptr<::arrow::Array> out; + ASSERT_THAT(ReadArray(out, {.path = basePath, .io = file_io, .projection = schema}, + /*metadata=*/nullptr), + IsOk()); + auto expected = + ::arrow::json::ArrayFromJSONString(::arrow::struct_(arrow_schema->fields()), + R"([ + {"id": 1, "events": [{"name": "open", "secret": null}, {"name": "close", "secret": null}], "properties": [["a", {"label": "A", "secret": null}]]}, + {"id": 2, "events": [], "properties": []} + ])") + .ValueOrDie(); + ASSERT_TRUE(out->Equals(*expected)) << "actual:\n" + << out->ToString() << "expected:\n" + << expected->ToString(); +} + +TEST_F(ParquetReadWrite, DoesNotMaterializeUnknownFieldsOnWrite) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeOptional(1, "id", int32()), + SchemaField::MakeOptional(2, "mystery", unknown()), + SchemaField::MakeOptional(3, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(4, "name", string()), + SchemaField::MakeOptional(5, "secret", unknown()), + })), + }); + + ArrowSchema arrow_c_schema; + ASSERT_THAT(ToArrowSchema(*schema, &arrow_c_schema), IsOk()); + auto arrow_schema = ::arrow::ImportType(&arrow_c_schema).ValueOrDie(); + + auto array = + ::arrow::json::ArrayFromJSONString(::arrow::struct_(arrow_schema->fields()), + R"([ + [1, null, {"name": "Person0", "secret": null}], + [2, null, {"name": "Person1", "secret": null}] + ])") + .ValueOrDie(); + + std::shared_ptr file_io = arrow::ArrowFileSystemFileIO::MakeMockFileIO(); + const std::string basePath = "unknown_fields.parquet"; + + WriterProperties writer_properties; + writer_properties.Set(WriterProperties::kParquetCompression, + std::string("uncompressed")); + ASSERT_THAT(WriteArray(array, {.path = basePath, + .schema = schema, + .io = file_io, + .properties = std::move(writer_properties)}), + IsOk()); + + auto& arrow_file_io = internal::checked_cast(*file_io); + auto input_file = arrow_file_io.fs()->OpenInputFile(basePath).ValueOrDie(); + auto parquet_reader = ::parquet::ParquetFileReader::Open(input_file); + auto parquet_schema = parquet_reader->metadata()->schema(); + + // Unknown fields (mystery, secret) are also written as null-type columns. + ASSERT_EQ(parquet_schema->num_columns(), 4); + EXPECT_EQ(parquet_schema->Column(0)->schema_node()->field_id(), 1); + EXPECT_EQ(parquet_schema->Column(1)->schema_node()->field_id(), 2); + EXPECT_EQ(parquet_schema->Column(2)->schema_node()->field_id(), 4); + EXPECT_EQ(parquet_schema->Column(3)->schema_node()->field_id(), 5); + + std::shared_ptr<::arrow::Array> out; + ASSERT_THAT(ReadArray(out, {.path = basePath, .io = file_io, .projection = schema}, + /*metadata=*/nullptr), + IsOk()); + auto expected = + ::arrow::json::ArrayFromJSONString(::arrow::struct_(arrow_schema->fields()), + R"([ + [1, null, {"name": "Person0", "secret": null}], + [2, null, {"name": "Person1", "secret": null}] + ])") + .ValueOrDie(); + ASSERT_TRUE(out->Equals(*expected)) << "actual:\n" + << out->ToString() << "expected:\n" + << expected->ToString(); +} + TEST_F(ParquetReadWrite, SimpleStructRoundTrip) { auto schema = std::make_shared(std::vector{ SchemaField::MakeOptional(1, "a", diff --git a/src/iceberg/test/schema_json_test.cc b/src/iceberg/test/schema_json_test.cc index c9532eeb6..08275a45c 100644 --- a/src/iceberg/test/schema_json_test.cc +++ b/src/iceberg/test/schema_json_test.cc @@ -64,6 +64,7 @@ INSTANTIATE_TEST_SUITE_P( SchemaJsonParam{.json = "\"string\"", .type = iceberg::string()}, SchemaJsonParam{.json = "\"binary\"", .type = iceberg::binary()}, SchemaJsonParam{.json = "\"uuid\"", .type = iceberg::uuid()}, + SchemaJsonParam{.json = "\"unknown\"", .type = iceberg::unknown()}, SchemaJsonParam{.json = "\"fixed[8]\"", .type = iceberg::fixed(8)}, SchemaJsonParam{.json = "\"decimal(10,2)\"", .type = iceberg::decimal(10, 2)}, SchemaJsonParam{.json = "\"date\"", .type = iceberg::date()}, @@ -136,6 +137,88 @@ TEST(SchemaJsonTest, RoundTrip) { ASSERT_EQ(dumped_json, json); } +TEST(SchemaJsonTest, UnknownFieldRoundTrip) { + constexpr std::string_view json = + R"({"fields":[{"id":1,"name":"mystery","required":false,"type":"unknown"}],"schema-id":1,"type":"struct"})"; + + ICEBERG_UNWRAP_OR_FAIL(auto schema, SchemaFromJson(nlohmann::json::parse(json))); + ASSERT_EQ(schema->fields().size(), 1); + + const auto& field = schema->fields()[0]; + ASSERT_EQ(field.field_id(), 1); + ASSERT_EQ(field.name(), "mystery"); + ASSERT_EQ(field.type()->type_id(), TypeId::kUnknown); + ASSERT_TRUE(field.optional()); + ASSERT_EQ(ToJson(*schema).dump(), json); +} + +TEST(SchemaJsonTest, NestedUnknownFieldsRoundTrip) { + constexpr std::string_view json = + R"({ + "fields": [ + { + "id": 1, + "name": "profile", + "required": false, + "type": { + "fields": [ + {"id": 2, "name": "mystery", "required": false, "type": "unknown"} + ], + "type": "struct" + } + }, + { + "id": 3, + "name": "mysteries", + "required": false, + "type": { + "element": "unknown", + "element-id": 4, + "element-required": false, + "type": "list" + } + }, + { + "id": 5, + "name": "properties", + "required": false, + "type": { + "key": "string", + "key-id": 6, + "type": "map", + "value": "unknown", + "value-id": 7, + "value-required": false + } + } + ], + "schema-id": 1, + "type": "struct" + })"; + const auto parsed_json = nlohmann::json::parse(json); + + ICEBERG_UNWRAP_OR_FAIL(auto schema, SchemaFromJson(parsed_json)); + ASSERT_EQ(schema->fields().size(), 3); + + const auto* profile = dynamic_cast(schema->fields()[0].type().get()); + ASSERT_NE(profile, nullptr); + ASSERT_EQ(profile->fields().size(), 1); + ASSERT_EQ(profile->fields()[0].type()->type_id(), TypeId::kUnknown); + ASSERT_TRUE(profile->fields()[0].optional()); + + const auto* mysteries = dynamic_cast(schema->fields()[1].type().get()); + ASSERT_NE(mysteries, nullptr); + ASSERT_EQ(mysteries->fields()[0].type()->type_id(), TypeId::kUnknown); + ASSERT_TRUE(mysteries->fields()[0].optional()); + + const auto* properties = dynamic_cast(schema->fields()[2].type().get()); + ASSERT_NE(properties, nullptr); + ASSERT_EQ(properties->value().type()->type_id(), TypeId::kUnknown); + ASSERT_TRUE(properties->value().optional()); + + ASSERT_EQ(ToJson(*schema), parsed_json); +} + TEST(SchemaJsonTest, IdentifierFieldIds) { // Test schema with identifier-field-ids constexpr std::string_view json_with_identifier_str = diff --git a/src/iceberg/test/schema_test.cc b/src/iceberg/test/schema_test.cc index 838b57600..8f1b20035 100644 --- a/src/iceberg/test/schema_test.cc +++ b/src/iceberg/test/schema_test.cc @@ -102,6 +102,8 @@ TEST(SchemaTest, ValidateRejectsV3TypesBeforeFormatV3) { {iceberg::SchemaField(1, "timestamp_ns", iceberg::timestamp_ns(), false)}); iceberg::Schema timestamptz_ns_schema( {iceberg::SchemaField(1, "timestamptz_ns", iceberg::timestamptz_ns(), false)}); + iceberg::Schema unknown_schema( + {iceberg::SchemaField(1, "unknown", iceberg::unknown(), true)}); auto status = timestamp_ns_schema.Validate(2); ASSERT_THAT(status, iceberg::IsError(iceberg::ErrorKind::kInvalidSchema)); @@ -115,12 +117,42 @@ TEST(SchemaTest, ValidateRejectsV3TypesBeforeFormatV3) { "Invalid type for timestamptz_ns: timestamptz_ns is not " "supported until v3")); + status = unknown_schema.Validate(2); + ASSERT_THAT(status, iceberg::IsError(iceberg::ErrorKind::kInvalidSchema)); + EXPECT_THAT(status, iceberg::HasErrorMessage( + "Invalid type for unknown: unknown is not supported until v3")); + EXPECT_THAT( timestamp_ns_schema.Validate(iceberg::TableMetadata::kSupportedTableFormatVersion), iceberg::IsOk()); EXPECT_THAT(timestamptz_ns_schema.Validate( iceberg::TableMetadata::kSupportedTableFormatVersion), iceberg::IsOk()); + EXPECT_THAT( + unknown_schema.Validate(iceberg::TableMetadata::kSupportedTableFormatVersion), + iceberg::IsOk()); +} + +TEST(SchemaTest, ValidateRejectsInvalidUnknownFields) { + iceberg::Schema required_unknown_schema( + {iceberg::SchemaField(1, "mystery", iceberg::unknown(), false)}); + auto status = required_unknown_schema.Validate( + iceberg::TableMetadata::kSupportedTableFormatVersion); + ASSERT_THAT(status, iceberg::IsError(iceberg::ErrorKind::kInvalidArgument)); + EXPECT_THAT(status, + iceberg::HasErrorMessage("Unknown type field 'mystery' must be optional")); + + iceberg::Schema map_key_unknown_schema({iceberg::SchemaField::MakeOptional( + 1, "properties", + std::make_shared( + iceberg::SchemaField::MakeRequired(2, iceberg::MapType::kKeyName, + iceberg::unknown()), + iceberg::SchemaField::MakeOptional(3, iceberg::MapType::kValueName, + iceberg::string())))}); + status = map_key_unknown_schema.Validate( + iceberg::TableMetadata::kSupportedTableFormatVersion); + ASSERT_THAT(status, iceberg::IsError(iceberg::ErrorKind::kInvalidArgument)); + EXPECT_THAT(status, iceberg::HasErrorMessage("Map 'key' cannot be unknown type")); } TEST(SchemaTest, IdentifierFields) { diff --git a/src/iceberg/test/schema_util_test.cc b/src/iceberg/test/schema_util_test.cc index fe6579ab3..ee075006f 100644 --- a/src/iceberg/test/schema_util_test.cc +++ b/src/iceberg/test/schema_util_test.cc @@ -226,6 +226,132 @@ TEST(SchemaUtilTest, ProjectSchemaEvolutionFloatToDouble) { AssertProjectedField(projection.fields[0], 0); } +TEST(SchemaUtilTest, ProjectSchemaEvolutionUnknownToPrimitive) { + Schema source_schema( + {SchemaField::MakeOptional(/*field_id=*/2, "value", iceberg::unknown())}); + Schema expected_schema( + {SchemaField::MakeOptional(/*field_id=*/2, "value", iceberg::string())}); + + auto projection_result = + Project(expected_schema, source_schema, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 1); + AssertProjectedField(projection.fields[0], 0); +} + +TEST(SchemaUtilTest, RejectSchemaEvolutionUnknownToRequiredPrimitive) { + Schema source_schema( + {SchemaField::MakeOptional(/*field_id=*/2, "value", iceberg::unknown())}); + Schema expected_schema( + {SchemaField::MakeRequired(/*field_id=*/2, "value", iceberg::string())}); + + auto projection_result = + Project(expected_schema, source_schema, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, + HasErrorMessage("Cannot project required field with id 2 as null")); +} + +TEST(SchemaUtilTest, ProjectSchemaEvolutionNestedFieldsToUnknown) { + Schema source_schema({ + SchemaField::MakeOptional( + /*field_id=*/2, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/201, "mystery", iceberg::int32()), + SchemaField::MakeOptional(/*field_id=*/202, "name", iceberg::string()), + })), + SchemaField::MakeOptional( + /*field_id=*/3, "items", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/301, "element", iceberg::int32()))), + SchemaField::MakeOptional( + /*field_id=*/4, "attributes", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/401, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/402, "value", iceberg::int32()))), + }); + Schema expected_schema({ + SchemaField::MakeOptional( + /*field_id=*/2, "profile", + std::make_shared(std::vector{ + SchemaField::MakeOptional(/*field_id=*/201, "mystery", iceberg::unknown()), + SchemaField::MakeOptional(/*field_id=*/202, "name", iceberg::string()), + })), + SchemaField::MakeOptional( + /*field_id=*/3, "items", + std::make_shared(SchemaField::MakeOptional( + /*field_id=*/301, "element", iceberg::unknown()))), + SchemaField::MakeOptional( + /*field_id=*/4, "attributes", + std::make_shared( + SchemaField::MakeRequired(/*field_id=*/401, "key", iceberg::string()), + SchemaField::MakeOptional(/*field_id=*/402, "value", iceberg::unknown()))), + }); + + for (bool prune_source : {false, true}) { + auto projection_result = Project(expected_schema, source_schema, prune_source); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 3); + AssertProjectedField(projection.fields[0], 0); + AssertProjectedField(projection.fields[1], 1); + AssertProjectedField(projection.fields[2], 2); + + ASSERT_EQ(projection.fields[0].children.size(), 2); + ASSERT_EQ(projection.fields[0].children[0].kind, FieldProjection::Kind::kNull); + AssertProjectedField(projection.fields[0].children[1], prune_source ? 0 : 1); + + ASSERT_EQ(projection.fields[1].children.size(), 1); + ASSERT_EQ(projection.fields[1].children[0].kind, FieldProjection::Kind::kNull); + + ASSERT_EQ(projection.fields[2].children.size(), 2); + AssertProjectedField(projection.fields[2].children[0], 0); + ASSERT_EQ(projection.fields[2].children[1].kind, FieldProjection::Kind::kNull); + } +} + +TEST(SchemaUtilTest, ProjectSchemaEvolutionUnknownToNestedAsNull) { + Schema source_schema({ + SchemaField::MakeOptional(/*field_id=*/2, "profile", iceberg::unknown()), + SchemaField::MakeOptional(/*field_id=*/3, "items", iceberg::unknown()), + SchemaField::MakeOptional(/*field_id=*/4, "attributes", iceberg::unknown()), + }); + Schema expected_schema({ + SchemaField::MakeOptional(/*field_id=*/2, "profile", CreateNestedStruct()), + SchemaField::MakeOptional(/*field_id=*/3, "items", CreateListOfStruct()), + SchemaField::MakeOptional(/*field_id=*/4, "attributes", CreateMapWithStructValue()), + }); + + for (bool prune_source : {false, true}) { + auto projection_result = Project(expected_schema, source_schema, prune_source); + ASSERT_THAT(projection_result, IsOk()); + + const auto& projection = *projection_result; + ASSERT_EQ(projection.fields.size(), 3); + ASSERT_EQ(projection.fields[0].kind, FieldProjection::Kind::kNull); + ASSERT_EQ(projection.fields[1].kind, FieldProjection::Kind::kNull); + ASSERT_EQ(projection.fields[2].kind, FieldProjection::Kind::kNull); + } +} + +TEST(SchemaUtilTest, RejectSchemaEvolutionUnknownToRequiredNested) { + Schema source_schema({ + SchemaField::MakeOptional(/*field_id=*/2, "profile", iceberg::unknown()), + }); + Schema expected_schema({ + SchemaField::MakeRequired(/*field_id=*/2, "profile", CreateNestedStruct()), + }); + + auto projection_result = + Project(expected_schema, source_schema, /*prune_source=*/false); + ASSERT_THAT(projection_result, IsError(ErrorKind::kInvalidSchema)); + ASSERT_THAT(projection_result, + HasErrorMessage("Cannot project required field with id 2 as null")); +} + TEST(SchemaUtilTest, ProjectSchemaEvolutionDecimalCompatible) { Schema source_schema( {SchemaField::MakeOptional(/*field_id=*/2, "value", iceberg::decimal(9, 2))}); diff --git a/src/iceberg/test/type_test.cc b/src/iceberg/test/type_test.cc index e68843be4..d405cccc1 100644 --- a/src/iceberg/test/type_test.cc +++ b/src/iceberg/test/type_test.cc @@ -90,7 +90,7 @@ TEST_P(TypeTest, StdFormat) { ASSERT_EQ(test_case.repr, std::format("{}", *test_case.type)); } -const static std::array kPrimitiveTypes = {{ +const static std::array kPrimitiveTypes = {{ { .name = "boolean", .type = iceberg::boolean(), @@ -217,6 +217,13 @@ const static std::array kPrimitiveTypes = {{ .primitive = true, .repr = "uuid", }, + { + .name = "unknown", + .type = iceberg::unknown(), + .type_id = iceberg::TypeId::kUnknown, + .primitive = true, + .repr = "unknown", + }, }}; const static std::array kNestedTypes = {{ diff --git a/src/iceberg/test/update_schema_test.cc b/src/iceberg/test/update_schema_test.cc index 8550c8b56..07872e69a 100644 --- a/src/iceberg/test/update_schema_test.cc +++ b/src/iceberg/test/update_schema_test.cc @@ -20,6 +20,7 @@ #include "iceberg/update/update_schema.h" #include +#include #include @@ -1054,6 +1055,40 @@ TEST_F(UpdateSchemaTest, UpdateColumnFloatToDouble) { EXPECT_EQ(*field_opt->get().type(), *float64()); } +TEST_F(UpdateSchemaTest, UpdateColumnUnknownToPrimitive) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); + update->AddColumn("mystery", unknown(), "A null-only placeholder"); + update->UpdateColumn("mystery", string()); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + + ICEBERG_UNWRAP_OR_FAIL(auto field_opt, result.schema->FindFieldByName("mystery")); + ASSERT_TRUE(field_opt.has_value()); + EXPECT_EQ(*field_opt->get().type(), *string()); + EXPECT_TRUE(field_opt->get().optional()); + EXPECT_EQ(field_opt->get().doc(), "A null-only placeholder"); +} + +TEST_F(UpdateSchemaTest, AddRequiredUnknownColumnFails) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); + update->AllowIncompatibleChanges().AddRequiredColumn("mystery", unknown()); + + auto result = update->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(result, HasErrorMessage("Unknown type field 'mystery' must be optional")); +} + +TEST_F(UpdateSchemaTest, AddColumnWithRequiredNestedUnknownFails) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); + update->AddColumn("profile", struct_({ + SchemaField::MakeRequired(3, "mystery", unknown()), + })); + + auto result = update->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(result, HasErrorMessage("Unknown type field 'mystery' must be optional")); +} + TEST_F(UpdateSchemaTest, UpdateColumnSameType) { ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateSchema()); update->AddColumn("id", int32()); diff --git a/src/iceberg/test/visit_type_test.cc b/src/iceberg/test/visit_type_test.cc index 7104581f5..f038f906f 100644 --- a/src/iceberg/test/visit_type_test.cc +++ b/src/iceberg/test/visit_type_test.cc @@ -53,7 +53,7 @@ std::string TypeTestCaseToString(const ::testing::TestParamInfo& i return info.param.name; } -const static std::array kPrimitiveTypes = {{ +const static std::array kPrimitiveTypes = {{ { .name = "boolean", .type = iceberg::boolean(), @@ -180,6 +180,13 @@ const static std::array kPrimitiveTypes = {{ .primitive = true, .repr = "uuid", }, + { + .name = "unknown", + .type = iceberg::unknown(), + .type_id = iceberg::TypeId::kUnknown, + .primitive = true, + .repr = "unknown", + }, }}; const static std::array kNestedTypes = {{ diff --git a/src/iceberg/type.cc b/src/iceberg/type.cc index b5bee37e2..057dcf513 100644 --- a/src/iceberg/type.cc +++ b/src/iceberg/type.cc @@ -350,6 +350,10 @@ TypeId UuidType::type_id() const { return kTypeId; } std::string UuidType::ToString() const { return "uuid"; } bool UuidType::Equals(const Type& other) const { return other.type_id() == kTypeId; } +TypeId UnknownType::type_id() const { return kTypeId; } +std::string UnknownType::ToString() const { return "unknown"; } +bool UnknownType::Equals(const Type& other) const { return other.type_id() == kTypeId; } + FixedType::FixedType(int32_t length) : length_(length) { ICEBERG_CHECK_OR_DIE(length >= 0, "FixedType: length must be >= 0, was {}", length); } @@ -392,6 +396,7 @@ TYPE_FACTORY(timestamptz_ns, TimestampTzNsType) TYPE_FACTORY(binary, BinaryType) TYPE_FACTORY(string, StringType) TYPE_FACTORY(uuid, UuidType) +TYPE_FACTORY(unknown, UnknownType) #undef TYPE_FACTORY @@ -455,6 +460,8 @@ std::string_view ToString(TypeId id) { return "fixed"; case TypeId::kBinary: return "binary"; + case TypeId::kUnknown: + return "unknown"; } std::unreachable(); diff --git a/src/iceberg/type.h b/src/iceberg/type.h index 53237cdb5..c0966759e 100644 --- a/src/iceberg/type.h +++ b/src/iceberg/type.h @@ -503,6 +503,21 @@ class ICEBERG_EXPORT UuidType : public PrimitiveType { bool Equals(const Type& other) const override; }; +/// \brief A null-only placeholder type used when a more specific type is not known. +class ICEBERG_EXPORT UnknownType : public PrimitiveType { + public: + constexpr static const TypeId kTypeId = TypeId::kUnknown; + + UnknownType() = default; + ~UnknownType() override = default; + + TypeId type_id() const override; + std::string ToString() const override; + + protected: + bool Equals(const Type& other) const override; +}; + /// @} /// \defgroup type-factories Factory functions for creating primitive data types @@ -538,6 +553,8 @@ ICEBERG_EXPORT const std::shared_ptr& binary(); ICEBERG_EXPORT const std::shared_ptr& string(); /// \brief Return a UuidType instance. ICEBERG_EXPORT const std::shared_ptr& uuid(); +/// \brief Return an UnknownType instance. +ICEBERG_EXPORT const std::shared_ptr& unknown(); /// \brief Create a DecimalType with the given precision and scale. /// \param precision The number of decimal digits (max 38). diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index 064ec285a..745c63acb 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -31,7 +31,7 @@ namespace iceberg { /// This is not a complete data type by itself because some types are nested /// and/or parameterized. /// -/// Iceberg V3 types are not currently supported. +/// Iceberg V3's `unknown` type is supported as a null-only placeholder type. enum class TypeId { kStruct, kList, @@ -52,6 +52,7 @@ enum class TypeId { kUuid, kFixed, kBinary, + kUnknown, }; /// \brief The time unit. In Iceberg V3 nanoseconds are also supported. @@ -83,6 +84,7 @@ class TimestampTzType; class TimestampNsType; class TimestampTzNsType; class Type; +class UnknownType; class UuidType; /// \brief Data values. diff --git a/src/iceberg/update/update_schema.cc b/src/iceberg/update/update_schema.cc index 1f35781fa..0e7f147b0 100644 --- a/src/iceberg/update/update_schema.cc +++ b/src/iceberg/update/update_schema.cc @@ -593,6 +593,7 @@ Result UpdateSchema::Apply() { ICEBERG_ASSIGN_OR_RAISE( auto new_schema, Schema::Make(std::move(new_fields), schema_->schema_id(), fresh_identifier_ids)); + ICEBERG_RETURN_UNEXPECTED(new_schema->Validate(base().format_version)); std::unordered_map updated_props; const auto& base_metadata = base(); diff --git a/src/iceberg/util/struct_like_set.cc b/src/iceberg/util/struct_like_set.cc index 433cfa681..12648ea5e 100644 --- a/src/iceberg/util/struct_like_set.cc +++ b/src/iceberg/util/struct_like_set.cc @@ -263,6 +263,8 @@ Status ValidateScalarAgainstType(const Scalar& scalar, const Type& type) { } switch (type.type_id()) { + case TypeId::kUnknown: + return InvalidArgument("Expected unknown but got {}", ScalarTypeName(scalar)); case TypeId::kBoolean: ICEBERG_PRECHECK(std::holds_alternative(scalar), "Expected boolean but got {}", ScalarTypeName(scalar)); diff --git a/src/iceberg/util/type_util.cc b/src/iceberg/util/type_util.cc index c6b9bb3ed..cb01be08f 100644 --- a/src/iceberg/util/type_util.cc +++ b/src/iceberg/util/type_util.cc @@ -426,6 +426,11 @@ bool IsPromotionAllowed(const std::shared_ptr& from_type, TypeId from_id = from_type->type_id(); TypeId to_id = to_type->type_id(); + // unknown -> any primitive type + if (from_id == TypeId::kUnknown) { + return true; + } + // int -> long if (from_id == TypeId::kInt && to_id == TypeId::kLong) { return true; diff --git a/src/iceberg/util/type_util.h b/src/iceberg/util/type_util.h index ceb5e62ec..8fd5ef19f 100644 --- a/src/iceberg/util/type_util.h +++ b/src/iceberg/util/type_util.h @@ -177,6 +177,7 @@ ICEBERG_EXPORT Result> AssignFreshIds( /// \brief Check if type promotion from one type to another is allowed. /// /// Type promotion rules: +/// - unknown -> any primitive type /// - int -> long /// - float -> double /// - decimal(P,S) -> decimal(P',S) where P' > P diff --git a/src/iceberg/util/visitor_generate.h b/src/iceberg/util/visitor_generate.h index 7a3648546..a5b0c2ced 100644 --- a/src/iceberg/util/visitor_generate.h +++ b/src/iceberg/util/visitor_generate.h @@ -38,6 +38,7 @@ namespace iceberg { ACTION(Uuid); \ ACTION(Fixed); \ ACTION(Binary); \ + ACTION(Unknown); \ ACTION(Struct); \ ACTION(List); \ ACTION(Map);