Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ abstract class ColumnWriterBase implements ColumnWriter {
private ValuesWriter definitionLevelColumn;
private ValuesWriter dataColumn;
private int valueCount;
// track the required field DataPageHeaderV2.num_nulls
// https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift
protected int nullCount;

private long rowsWrittenSoFar = 0;
private int pageRowCount;
Expand Down Expand Up @@ -115,6 +118,7 @@ public void writeNull(int repetitionLevel, int definitionLevel) {
definitionLevel(definitionLevel);
collector.writeNull(repetitionLevel, definitionLevel);
++valueCount;
++nullCount;
} catch (Throwable e) {
statusManager.abort();
throw e;
Expand Down Expand Up @@ -392,6 +396,7 @@ void writePage() {
definitionLevelColumn.reset();
dataColumn.reset();
valueCount = 0;
nullCount = 0;
collector.resetPageStatistics();
pageRowCount = 0;
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ void writePage(
Encoding encoding = values.getEncoding();
pageWriter.writePageV2(
rowCount,
Math.toIntExact(statistics.getNumNulls()),
nullCount,
valueCount,
repetitionLevels.getBytes(),
definitionLevels.getBytes(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,14 @@
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.bytes.HeapByteBufferAllocator;
import org.apache.parquet.bytes.TrackingByteBufferAllocator;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.column.page.DataPage;
import org.apache.parquet.column.page.DataPageV2;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.crypto.AesCipher;
import org.apache.parquet.crypto.ColumnEncryptionProperties;
Expand Down Expand Up @@ -858,4 +863,68 @@ public void testNoFlushAfterException() throws Exception {
FileSystem fs = file.getFileSystem(conf);
assertTrue(!fs.exists(file) || fs.getFileStatus(file).getLen() == 0);
}

@Test
public void testV2PageNullCountWithStatisticsDisabled() throws Exception {
// Regression test: when using PARQUET_2_0 with statistics disabled on a nullable column,
// DataPageHeaderV2.num_nulls must still contain the correct null count (not -1).
MessageType schema = Types.buildMessage()
.required(INT32)
.named("id")
.optional(BINARY)
.as(stringType())
.named("value")
.named("test_schema");

File file = temp.newFile();
file.delete();
Path path = new Path(file.getAbsolutePath());

int totalRecords = 10;
int expectedNulls = 4; // records where i % 3 == 0: i=0,3,6,9

// Write with PARQUET_2_0 and statistics disabled on the nullable "value" column
try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
.withType(schema)
.withWriterVersion(PARQUET_2_0)
.withStatisticsEnabled("value", false)
.withPageSize(1024 * 1024) // large page to keep all records in one page
.build()) {
SimpleGroupFactory factory = new SimpleGroupFactory(schema);
for (int i = 0; i < totalRecords; i++) {
Group group = factory.newGroup().append("id", i);
if (i % 3 != 0) {
group.append("value", "hello-" + i);
}
writer.write(group);
}
}

// Read back the page-level metadata and verify num_nulls
try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) {
MessageType fileSchema = reader.getFooter().getFileMetaData().getSchema();

// Find the "value" column descriptor
ColumnDescriptor valueColumn = fileSchema.getColumns().stream()
.filter(c -> c.getPath()[0].equals("value"))
.findFirst()
.orElseThrow(() -> new AssertionError("Column 'value' not found"));

PageReadStore rowGroup = reader.readNextRowGroup();
PageReader pageReader = rowGroup.getPageReader(valueColumn);
DataPage page = pageReader.readPage();

// Verify it's a V2 page (because we used PARQUET_2_0)
assertTrue(
"PARQUET_2_0 writer should produce DataPageV2 pages, got: "
+ page.getClass().getSimpleName(),
page instanceof DataPageV2);

DataPageV2 pageV2 = (DataPageV2) page;
assertEquals(
"DataPageV2.num_nulls should be the actual null count even when statistics are disabled",
expectedNulls,
pageV2.getNullCount());
}
}
}