diff --git a/cmd/seq-db/seq-db.go b/cmd/seq-db/seq-db.go index b0a7c47e..59d05eaa 100644 --- a/cmd/seq-db/seq-db.go +++ b/cmd/seq-db/seq-db.go @@ -271,6 +271,7 @@ func startStore( TokenTableZstdLevel: cfg.Compression.SealedZstdCompressionLevel, DocBlocksZstdLevel: cfg.Compression.DocBlockZstdCompressionLevel, DocBlockSize: int(cfg.DocsSorting.DocBlockSize), + SkipFsync: cfg.Resources.SkipFsync, }, Fraction: frac.Config{ Search: frac.SearchConfig{ @@ -283,6 +284,7 @@ func startStore( }, SkipSortDocs: !cfg.DocsSorting.Enabled, KeepMetaFile: false, + SkipFsync: cfg.Resources.SkipFsync, }, OffloadingEnabled: cfg.Offloading.Enabled, OffloadingRetention: cfg.Offloading.Retention, diff --git a/frac/active.go b/frac/active.go index 75d04e16..25231dac 100644 --- a/frac/active.go +++ b/frac/active.go @@ -71,9 +71,9 @@ func NewActive( cfg *Config, skipMaskProvider skipMaskProvider, ) *Active { - docsFile, docsStats := mustOpenFile(baseFileName+consts.DocsFileSuffix, config.SkipFsync) + docsFile, docsStats := mustOpenFile(baseFileName+consts.DocsFileSuffix, cfg.SkipFsync) - metaFile, writer, metaReader, walReader, metaSize := mustOpenMetaWriter(baseFileName, readLimiter, docsFile, docsStats) + metaFile, writer, metaReader, walReader, metaSize := mustOpenMetaWriter(baseFileName, readLimiter, docsFile, docsStats, cfg.SkipFsync) f := &Active{ TokenList: NewActiveTokenList(config.IndexWorkers), @@ -117,24 +117,24 @@ func mustOpenMetaWriter( readLimiter *storage.ReadLimiter, docsFile *os.File, docsStats os.FileInfo, -) (*os.File, *ActiveWriter, *storage.DocBlocksReader, *storage.WalReader, uint64) { + skipFsync bool) (*os.File, *ActiveWriter, *storage.DocBlocksReader, *storage.WalReader, uint64) { legacyMetaFileName := baseFileName + consts.MetaFileSuffix if _, err := os.Stat(legacyMetaFileName); err == nil { // .meta file exists - metaFile, metaStats := mustOpenFile(legacyMetaFileName, config.SkipFsync) + metaFile, metaStats := mustOpenFile(legacyMetaFileName, skipFsync) metaSize := uint64(metaStats.Size()) metaReader := storage.NewDocBlocksReader(readLimiter, metaFile) - writer := NewActiveWriterLegacy(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync) + writer := NewActiveWriterLegacy(docsFile, metaFile, docsStats.Size(), metaStats.Size(), skipFsync) logger.Info("using legacy meta file format", zap.String("fraction", baseFileName)) return metaFile, writer, &metaReader, nil, metaSize } logger.Info("using new WAL format", zap.String("fraction", baseFileName)) walFileName := baseFileName + consts.WalFileSuffix - metaFile, metaStats := mustOpenFile(walFileName, config.SkipFsync) + metaFile, metaStats := mustOpenFile(walFileName, skipFsync) metaSize := uint64(metaStats.Size()) - writer := NewActiveWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), config.SkipFsync) + writer := NewActiveWriter(docsFile, metaFile, docsStats.Size(), metaStats.Size(), skipFsync) walReader, err := storage.NewWalReader(readLimiter, metaFile, baseFileName) if err != nil { logger.Fatal("failed to initialize WAL reader", zap.String("fraction", baseFileName), zap.Error(err)) diff --git a/frac/active_sealing_source.go b/frac/active_sealing_source.go index e7c451e2..36683df3 100644 --- a/frac/active_sealing_source.go +++ b/frac/active_sealing_source.go @@ -31,28 +31,24 @@ type ( ) type ActiveSealingSource struct { - params common.SealParams // Sealing parameters - - info *common.Info // fraction Info - created time.Time // Creation time of the source - - blocksOffsets []uint64 // Document block offsets - - sortedLIDs []uint32 // Sorted LIDs (Local ID) - oldToNewLIDs []uint32 // Mapping from old LIDs to new ones (after sorting) - - mids *UInt64s // MIDs - rids *UInt64s // RIDs - - fields []string // Sorted field names - fieldTIDs [][]uint32 // Each field contains sorted TIDs based on token value - - tokens [][]byte // Tokens (values) by TID - lids []*TokenLIDs // LID lists for each token - - docPosMap map[seq.ID]seq.DocPos // Original document positions - docPosSorted []seq.DocPos // Document positions after sorting - docsReader *storage.DocsReader // Document storage reader + params common.SealParams // Sealing parameters + info *common.Info // fraction Info + created time.Time // Creation time of the source + sortedLIDs []uint32 // Sorted LIDs (Local ID) + oldToNewLIDs []uint32 // Mapping from old LIDs to new ones (after sorting) + mids *UInt64s // MIDs + rids *UInt64s // RIDs + fields []string // Sorted field names + fieldsMaxTIDs []uint32 // Maximum TIDs for each field + tids []uint32 // Sorted TIDs (Token ID) + tokens [][]byte // Tokens (values) by TID + lids []*TokenLIDs // LID lists for each token + docPosMap map[seq.ID]seq.DocPos // Original document positions + docPosSorted []seq.DocPos // Document positions after sorting + blocksOffsets []uint64 // Document block offsets + docsReader *storage.DocsReader // Document storage reader + lastErr error // Last error + skipFsync bool } func NewActiveSealingSource(active *Active, params common.SealParams) (*ActiveSealingSource, error) { @@ -81,6 +77,7 @@ func NewActiveSealingSource(active *Active, params common.SealParams) (*ActiveSe docPosMap: active.DocsPositions.idToPos, blocksOffsets: active.DocBlocks.vals, docsReader: &active.sortReader, + skipFsync: active.Config.SkipFsync, } src.prepareInfo() @@ -313,9 +310,10 @@ func (src *ActiveSealingSource) SortDocs() error { } src.info.DocsOnDisk = uint64(stat.Size()) - // Synchronize and rename file - if err := sdocsFile.Sync(); err != nil { - return err + if !src.skipFsync { + if err := sdocsFile.Sync(); err != nil { + return err + } } if err := sdocsFile.Close(); err != nil { @@ -325,9 +323,10 @@ func (src *ActiveSealingSource) SortDocs() error { if err := os.Rename(sdocsFile.Name(), src.info.Path+consts.SdocsFileSuffix); err != nil { return err } - - if err := util.SyncPath(filepath.Dir(src.info.Path)); err != nil { - return err + if !src.skipFsync { + if err := util.SyncPath(filepath.Dir(src.info.Path)); err != nil { + return err + } } // Log compression statistics diff --git a/frac/common/seal_params.go b/frac/common/seal_params.go index c19365f9..443179f1 100644 --- a/frac/common/seal_params.go +++ b/frac/common/seal_params.go @@ -9,4 +9,6 @@ type SealParams struct { DocBlocksZstdLevel int // DocBlocksZstdLevel is the zstd compress level of each document block. DocBlockSize int // DocBlockSize is decompressed payload size of document block. + + SkipFsync bool } diff --git a/frac/config.go b/frac/config.go index 3b1c1e97..ac315b41 100644 --- a/frac/config.go +++ b/frac/config.go @@ -5,6 +5,7 @@ type Config struct { SkipSortDocs bool KeepMetaFile bool + SkipFsync bool } type SearchConfig struct { diff --git a/frac/sealed/sealing/sealer.go b/frac/sealed/sealing/sealer.go index 57863d82..5f7ee9df 100644 --- a/frac/sealed/sealing/sealer.go +++ b/frac/sealed/sealing/sealer.go @@ -57,12 +57,11 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { return nil, err } - if err := createAndWrite( - info.Path+consts.IDTmpFileSuffix, - info.Path+consts.IDFileSuffix, - func(f *os.File) error { return sealer.WriteIDFile(f, src) }, - ); err != nil { - return nil, err + if !params.SkipFsync { + // Ensure data is flushed to disk + if err := indexFile.Sync(); err != nil { + return nil, err + } } if err := createAndWriteBoth( @@ -99,8 +98,10 @@ func Seal(src Source, params common.SealParams) (*sealed.PreloadedData, error) { totalSize += uint64(st.Size()) } - info.IndexOnDisk = totalSize - lidsTable := sealer.LIDsTable() + if !params.SkipFsync { + // Ensure directory metadata is synced to disk + util.MustSyncPath(filepath.Dir(info.Path)) + } preloaded := &sealed.PreloadedData{ Info: info,