@@ -70,6 +70,25 @@ type Database struct {
7070 seekCompGauge metrics.Gauge // Gauge for tracking the number of table compaction caused by read opt
7171 manualMemAllocGauge metrics.Gauge // Gauge for tracking amount of non-managed memory currently allocated
7272
73+ compDebtGauge metrics.Gauge
74+ compInProgressGauge metrics.Gauge
75+
76+ commitCountMeter metrics.Meter
77+ commitTotalDurationMeter metrics.Meter
78+ commitSemaphoreWaitMeter metrics.Meter
79+ commitMemTableWriteStallMeter metrics.Meter
80+ commitL0ReadAmpWriteStallMeter metrics.Meter
81+ commitWALRotationMeter metrics.Meter
82+ commitWaitMeter metrics.Meter
83+
84+ commitCount atomic.Int64
85+ commitTotalDuration atomic.Int64
86+ commitSemaphoreWait atomic.Int64
87+ commitMemTableWriteStall atomic.Int64
88+ commitL0ReadAmpWriteStall atomic.Int64
89+ commitWALRotation atomic.Int64
90+ commitWait atomic.Int64
91+
7392 levelsGauge []metrics.Gauge // Gauge for tracking the number of tables in levels
7493
7594 quitLock sync.RWMutex // Mutex protecting the quit channel and the closed flag
@@ -137,7 +156,38 @@ func (l panicLogger) Fatalf(format string, args ...interface{}) {
137156
138157// New returns a wrapped pebble DB object. The namespace is the prefix that the
139158// metrics reporting should use for surfacing internal stats.
140- func New (file string , cache int , handles int , namespace string , readonly bool , ephemeral bool ) (* Database , error ) {
159+ func New (file string , cache int , handles int , namespace string , readonly bool , ephemeral bool , extraOptions * ExtraOptions ) (* Database , error ) {
160+ if extraOptions == nil {
161+ extraOptions = & ExtraOptions {}
162+ }
163+ if extraOptions .MemTableStopWritesThreshold <= 0 {
164+ extraOptions .MemTableStopWritesThreshold = 2
165+ }
166+ if extraOptions .MaxConcurrentCompactions == nil {
167+ extraOptions .MaxConcurrentCompactions = func () int { return runtime .NumCPU () }
168+ }
169+ var levels []pebble.LevelOptions
170+ if len (extraOptions .Levels ) == 0 {
171+ levels = []pebble.LevelOptions {
172+ {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
173+ {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
174+ {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
175+ {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
176+ {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
177+ {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
178+ {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
179+ }
180+ } else {
181+ for _ , level := range extraOptions .Levels {
182+ levels = append (levels , pebble.LevelOptions {
183+ BlockSize : level .BlockSize ,
184+ IndexBlockSize : level .IndexBlockSize ,
185+ TargetFileSize : level .TargetFileSize ,
186+ FilterPolicy : bloom .FilterPolicy (10 ),
187+ })
188+ }
189+ }
190+
141191 // Ensure we have some minimal caching and file guarantees
142192 if cache < minCache {
143193 cache = minCache
@@ -162,7 +212,7 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
162212
163213 // Two memory tables is configured which is identical to leveldb,
164214 // including a frozen memory table and another live one.
165- memTableLimit := 2
215+ memTableLimit := extraOptions . MemTableStopWritesThreshold
166216 memTableSize := cache * 1024 * 1024 / 2 / memTableLimit
167217
168218 // The memory table size is currently capped at maxMemTableSize-1 due to a
@@ -200,19 +250,11 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
200250
201251 // The default compaction concurrency(1 thread),
202252 // Here use all available CPUs for faster compaction.
203- MaxConcurrentCompactions : func () int { return runtime . NumCPU () } ,
253+ MaxConcurrentCompactions : extraOptions . MaxConcurrentCompactions ,
204254
205- // Per-level options. Options for at least one level must be specified. The
206- // options for the last level are used for all subsequent levels.
207- Levels : []pebble.LevelOptions {
208- {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
209- {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
210- {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
211- {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
212- {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
213- {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
214- {TargetFileSize : 2 * 1024 * 1024 , FilterPolicy : bloom .FilterPolicy (10 )},
215- },
255+ // Per-level extraOptions. Options for at least one level must be specified. The
256+ // extraOptions for the last level are used for all subsequent levels.
257+ Levels : levels ,
216258 ReadOnly : readonly ,
217259 EventListener : & pebble.EventListener {
218260 CompactionBegin : db .onCompactionBegin ,
@@ -221,11 +263,31 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
221263 WriteStallEnd : db .onWriteStallEnd ,
222264 },
223265 Logger : panicLogger {}, // TODO(karalabe): Delete when this is upstreamed in Pebble
266+
267+ BytesPerSync : extraOptions .BytesPerSync ,
268+ L0CompactionFileThreshold : extraOptions .L0CompactionFileThreshold ,
269+ L0CompactionThreshold : extraOptions .L0CompactionThreshold ,
270+ L0StopWritesThreshold : extraOptions .L0StopWritesThreshold ,
271+ LBaseMaxBytes : extraOptions .LBaseMaxBytes ,
272+ DisableAutomaticCompactions : extraOptions .DisableAutomaticCompactions ,
273+ WALBytesPerSync : extraOptions .WALBytesPerSync ,
274+ WALDir : extraOptions .WALDir ,
275+ WALMinSyncInterval : extraOptions .WALMinSyncInterval ,
276+ TargetByteDeletionRate : extraOptions .TargetByteDeletionRate ,
224277 }
225278 // Disable seek compaction explicitly. Check https://github.com/ethereum/go-ethereum/pull/20130
226279 // for more details.
227280 opt .Experimental .ReadSamplingMultiplier = - 1
228281
282+ if opt .Experimental .ReadSamplingMultiplier != 0 {
283+ opt .Experimental .ReadSamplingMultiplier = extraOptions .Experimental .ReadSamplingMultiplier
284+ }
285+ opt .Experimental .L0CompactionConcurrency = extraOptions .Experimental .L0CompactionConcurrency
286+ opt .Experimental .CompactionDebtConcurrency = extraOptions .Experimental .CompactionDebtConcurrency
287+ opt .Experimental .ReadCompactionRate = extraOptions .Experimental .ReadCompactionRate
288+ opt .Experimental .MaxWriterConcurrency = extraOptions .Experimental .MaxWriterConcurrency
289+ opt .Experimental .ForceWriterParallelism = extraOptions .Experimental .ForceWriterParallelism
290+
229291 // Open the db and recover any potential corruptions
230292 innerDB , err := pebble .Open (file , opt )
231293 if err != nil {
@@ -247,6 +309,17 @@ func New(file string, cache int, handles int, namespace string, readonly bool, e
247309 db .seekCompGauge = metrics .GetOrRegisterGauge (namespace + "compact/seek" , nil )
248310 db .manualMemAllocGauge = metrics .GetOrRegisterGauge (namespace + "memory/manualalloc" , nil )
249311
312+ db .compDebtGauge = metrics .GetOrRegisterGauge (namespace + "compact/debt" , nil )
313+ db .compInProgressGauge = metrics .GetOrRegisterGauge (namespace + "compact/inprogress" , nil )
314+
315+ db .commitCountMeter = metrics .GetOrRegisterMeter (namespace + "commit/counter" , nil )
316+ db .commitTotalDurationMeter = metrics .GetOrRegisterMeter (namespace + "commit/duration/total" , nil )
317+ db .commitSemaphoreWaitMeter = metrics .GetOrRegisterMeter (namespace + "commit/duration/semaphorewait" , nil )
318+ db .commitMemTableWriteStallMeter = metrics .GetOrRegisterMeter (namespace + "commit/duration/memtablewritestall" , nil )
319+ db .commitL0ReadAmpWriteStallMeter = metrics .GetOrRegisterMeter (namespace + "commit/duration/l0readampwritestall" , nil )
320+ db .commitWALRotationMeter = metrics .GetOrRegisterMeter (namespace + "commit/duration/walrotation" , nil )
321+ db .commitWaitMeter = metrics .GetOrRegisterMeter (namespace + "commit/duration/commitwait" , nil )
322+
250323 // Start up the metrics gathering and return
251324 go db .meter (metricsGatheringInterval , namespace )
252325 return db , nil
@@ -459,6 +532,14 @@ func (d *Database) meter(refresh time.Duration, namespace string) {
459532 compReads [2 ]int64
460533
461534 nWrites [2 ]int64
535+
536+ commitCounts [2 ]int64
537+ commitTotalDurations [2 ]int64
538+ commitSemaphoreWaits [2 ]int64
539+ commitMemTableWriteStalls [2 ]int64
540+ commitL0ReadAmpWriteStalls [2 ]int64
541+ commitWALRotations [2 ]int64
542+ commitWaits [2 ]int64
462543 )
463544
464545 // Iterate ad infinitum and collect the stats
@@ -474,6 +555,14 @@ func (d *Database) meter(refresh time.Duration, namespace string) {
474555 writeDelayTime = d .writeDelayTime .Load ()
475556 nonLevel0CompCount = int64 (d .nonLevel0Comp .Load ())
476557 level0CompCount = int64 (d .level0Comp .Load ())
558+
559+ commitCount = d .commitCount .Load ()
560+ commitTotalDuration = d .commitTotalDuration .Load ()
561+ commitSemaphoreWait = d .commitSemaphoreWait .Load ()
562+ commitMemTableWriteStall = d .commitMemTableWriteStall .Load ()
563+ commitL0ReadAmpWriteStall = d .commitL0ReadAmpWriteStall .Load ()
564+ commitWALRotation = d .commitWALRotation .Load ()
565+ commitWait = d .commitWait .Load ()
477566 )
478567 writeDelayTimes [i % 2 ] = writeDelayTime
479568 writeDelayCounts [i % 2 ] = writeDelayCount
@@ -524,6 +613,25 @@ func (d *Database) meter(refresh time.Duration, namespace string) {
524613 d .level0CompGauge .Update (level0CompCount )
525614 d .seekCompGauge .Update (stats .Compact .ReadCount )
526615
616+ commitCounts [i % 2 ] = commitCount
617+ commitTotalDurations [i % 2 ] = commitTotalDuration
618+ commitSemaphoreWaits [i % 2 ] = commitSemaphoreWait
619+ commitMemTableWriteStalls [i % 2 ] = commitMemTableWriteStall
620+ commitL0ReadAmpWriteStalls [i % 2 ] = commitL0ReadAmpWriteStall
621+ commitWALRotations [i % 2 ] = commitWALRotation
622+ commitWaits [i % 2 ] = commitWait
623+
624+ d .commitCountMeter .Mark (commitCounts [i % 2 ] - commitCounts [(i - 1 )% 2 ])
625+ d .commitTotalDurationMeter .Mark (commitTotalDurations [i % 2 ] - commitTotalDurations [(i - 1 )% 2 ])
626+ d .commitSemaphoreWaitMeter .Mark (commitSemaphoreWaits [i % 2 ] - commitSemaphoreWaits [(i - 1 )% 2 ])
627+ d .commitMemTableWriteStallMeter .Mark (commitMemTableWriteStalls [i % 2 ] - commitMemTableWriteStalls [(i - 1 )% 2 ])
628+ d .commitL0ReadAmpWriteStallMeter .Mark (commitL0ReadAmpWriteStalls [i % 2 ] - commitL0ReadAmpWriteStalls [(i - 1 )% 2 ])
629+ d .commitWALRotationMeter .Mark (commitWALRotations [i % 2 ] - commitWALRotations [(i - 1 )% 2 ])
630+ d .commitWaitMeter .Mark (commitWaits [i % 2 ] - commitWaits [(i - 1 )% 2 ])
631+
632+ d .compDebtGauge .Update (int64 (stats .Compact .EstimatedDebt ))
633+ d .compInProgressGauge .Update (stats .Compact .NumInProgress )
634+
527635 for i , level := range stats .Levels {
528636 // Append metrics for additional layers
529637 if i >= len (d .levelsGauge ) {
@@ -578,7 +686,20 @@ func (b *batch) Write() error {
578686 if b .db .closed {
579687 return pebble .ErrClosed
580688 }
581- return b .b .Commit (b .db .writeOptions )
689+ err := b .b .Commit (b .db .writeOptions )
690+ if err != nil {
691+ return err
692+ }
693+ stats := b .b .CommitStats ()
694+ b .db .commitCount .Add (1 )
695+ b .db .commitTotalDuration .Add (int64 (stats .TotalDuration ))
696+ b .db .commitSemaphoreWait .Add (int64 (stats .SemaphoreWaitDuration ))
697+ b .db .commitMemTableWriteStall .Add (int64 (stats .MemTableWriteStallDuration ))
698+ b .db .commitL0ReadAmpWriteStall .Add (int64 (stats .L0ReadAmpWriteStallDuration ))
699+ b .db .commitWALRotation .Add (int64 (stats .WALRotationDuration ))
700+ b .db .commitWait .Add (int64 (stats .CommitWaitDuration ))
701+ // TODO add metric for stats.WALQueueWaitDuration when it will be used by pebble (currently it is always 0)
702+ return nil
582703}
583704
584705// Reset resets the batch for reuse.
0 commit comments