Skip to content

Commit 11dd8a9

Browse files
Quota enforcing sector allocator
``` Moves disk quota enforcement to sector allocator, this allows the file pool to have arbitrary large sparse files without running into quota issues. A consequence of this is that very large sparse files are now a first class citizen of Buildbarn. We have therefore modified the sparse files implementation to use a struct inspired by the unix inode pointer struct. ```
1 parent ab42e02 commit 11dd8a9

12 files changed

+1121
-342
lines changed

cmd/bb_worker/main.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,9 @@ func main() {
7676
// currently only used by the virtual file system to store
7777
// output files of build actions. Going forward, this may be
7878
// used to store core dumps generated by build actions as well.
79-
filePool, err := pool.NewFilePoolFromConfiguration(configuration.FilePool)
79+
filePoolFactory, baseSectorAllocator, sectorSizeBytes, err := pool.NewFilePoolFactoryFromConfiguration(configuration.FilePool)
8080
if err != nil {
81-
return util.StatusWrap(err, "Failed to create file pool")
81+
return util.StatusWrap(err, "Failed to create file pool factory")
8282
}
8383

8484
// Storage access.
@@ -490,13 +490,21 @@ func main() {
490490
return util.StatusWrapf(err, "Invalid instance name prefix %#v", runnerConfiguration.InstanceNamePrefix)
491491
}
492492

493+
maximumSectors := runnerConfiguration.MaximumFilePoolSizeBytes / uint64(sectorSizeBytes)
494+
runnerFilePool := pool.NewQuotaEnforcingFilePool(
495+
filePoolFactory.NewFilePool(
496+
pool.NewQuotaEnforcingSectorAllocator(
497+
baseSectorAllocator,
498+
int64(maximumSectors),
499+
),
500+
),
501+
int64(runnerConfiguration.MaximumFilePoolFileCount),
502+
)
503+
493504
buildClient := builder.NewBuildClient(
494505
schedulerClient,
495506
buildExecutor,
496-
pool.NewQuotaEnforcingFilePool(
497-
filePool,
498-
runnerConfiguration.MaximumFilePoolFileCount,
499-
runnerConfiguration.MaximumFilePoolSizeBytes),
507+
runnerFilePool,
500508
clock.SystemClock,
501509
workerID,
502510
instanceNamePrefix,

pkg/filesystem/pool/BUILD.bazel

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,10 @@ go_library(
1111
"hole_source.go",
1212
"metrics_file_pool.go",
1313
"quota_enforcing_file_pool.go",
14+
"quota_enforcing_sector_allocator.go",
15+
"quota_metric.go",
1416
"sector_allocator.go",
17+
"sector_map.go",
1518
],
1619
importpath = "github.com/buildbarn/bb-remote-execution/pkg/filesystem/pool",
1720
visibility = ["//visibility:public"],
@@ -33,6 +36,8 @@ go_test(
3336
"block_device_backed_file_pool_test.go",
3437
"empty_file_pool_test.go",
3538
"quota_enforcing_file_pool_test.go",
39+
"quota_enforcing_sector_allocator_test.go",
40+
"sector_map_test.go",
3641
],
3742
deps = [
3843
":pool",

pkg/filesystem/pool/block_device_backed_file_pool.go

Lines changed: 80 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,20 @@ func (fp *blockDeviceBackedFilePool) NewFile(holeSource HoleSource, size uint64)
3737
fp: fp,
3838
holeSource: holeSource,
3939
sizeBytes: size,
40+
sm: SectorMap{},
4041
}, nil
4142
}
4243

4344
type blockDeviceBackedFile struct {
4445
fp *blockDeviceBackedFilePool
4546
holeSource HoleSource
4647
sizeBytes uint64
47-
sectors []uint32
48+
sm SectorMap
4849
}
4950

5051
func (f *blockDeviceBackedFile) Close() error {
51-
if len(f.sectors) > 0 {
52-
f.fp.sectorAllocator.FreeList(f.sectors)
53-
}
52+
f.sm.FreeSectors(0, f.fp.sectorAllocator.FreeList)
5453
f.fp = nil
55-
f.sectors = nil
5654
err := f.holeSource.Close()
5755
f.holeSource = nil
5856
return err
@@ -66,44 +64,48 @@ func (f *blockDeviceBackedFile) toDeviceOffset(sector uint32, offsetWithinSector
6664

6765
// getInitialSectorIndex is called by ReadAt() and WriteAt() to
6866
// determine which sectors in a file are affected by the operation.
69-
func (f *blockDeviceBackedFile) getInitialSectorIndex(off int64, n int) (int, int, int) {
70-
firstSectorIndex := int(off / int64(f.fp.sectorSizeBytes))
71-
endSectorIndex := int((uint64(off) + uint64(n) + uint64(f.fp.sectorSizeBytes) - 1) / uint64(f.fp.sectorSizeBytes))
72-
if endSectorIndex > len(f.sectors) {
73-
endSectorIndex = len(f.sectors)
74-
}
67+
func (f *blockDeviceBackedFile) getInitialSectorIndex(off int64, n int) (uint64, uint64, int) {
68+
firstSectorIndex := uint64(off / int64(f.fp.sectorSizeBytes))
69+
endSectorIndex := (uint64(off) + uint64(n) + uint64(f.fp.sectorSizeBytes) - 1) / uint64(f.fp.sectorSizeBytes)
7570
offsetWithinSector := int(off % int64(f.fp.sectorSizeBytes))
7671
return firstSectorIndex, endSectorIndex - 1, offsetWithinSector
7772
}
7873

7974
// incrementSectorIndex is called by ReadAt() and WriteAt() to progress
8075
// to the next sequence of contiguously stored sectors.
81-
func (f *blockDeviceBackedFile) incrementSectorIndex(sectorIndex, offsetWithinSector *int, n int) {
76+
func (f *blockDeviceBackedFile) incrementSectorIndex(sectorIndex *uint64, offsetWithinSector *int, n int) {
8277
if (*offsetWithinSector+n)%f.fp.sectorSizeBytes != 0 {
8378
panic("Read or write did not finish at sector boundary")
8479
}
85-
*sectorIndex += (*offsetWithinSector + n) / f.fp.sectorSizeBytes
80+
*sectorIndex += uint64((*offsetWithinSector + n) / f.fp.sectorSizeBytes)
8681
*offsetWithinSector = 0
8782
}
8883

8984
// getSectorsContiguous converts an index of a sector in a file to the
9085
// on-disk sector number. It also computes how many sectors are stored
9186
// contiguously starting at this point.
92-
func (f *blockDeviceBackedFile) getSectorsContiguous(firstSectorIndex, lastSectorIndex int) (uint32, int) {
93-
firstSector := f.sectors[firstSectorIndex]
94-
nContiguous := 1
87+
func (f *blockDeviceBackedFile) getSectorsContiguous(firstSectorIndex, lastSectorIndex uint64) (uint32, int) {
88+
firstSector := f.sm.GetPhysicalIndex(firstSectorIndex)
89+
nContiguous := 0
9590
if firstSector == 0 {
9691
// A hole in a sparse file. Determine the size of the hole.
97-
for firstSectorIndex+nContiguous <= lastSectorIndex &&
98-
f.sectors[firstSectorIndex+nContiguous] == 0 {
99-
nContiguous++
92+
nextMapped, err := f.sm.GetNextMappedSector(firstSectorIndex)
93+
if err == io.EOF {
94+
nextMapped = lastSectorIndex + 1
95+
} else if err != nil {
96+
panic(fmt.Errorf("unexpected error from SectorMap.GetNextMappedSector: %w"))
10097
}
98+
nextMapped = min(nextMapped, lastSectorIndex+1)
99+
nContiguous = int(nextMapped - firstSectorIndex)
101100
} else {
102101
// A region that contains actual data. Determine how
103102
// many sectors are contiguous.
104-
for firstSectorIndex+nContiguous <= lastSectorIndex &&
105-
uint64(f.sectors[firstSectorIndex+nContiguous]) == uint64(firstSector)+uint64(nContiguous) {
106-
nContiguous++
103+
n := lastSectorIndex - firstSectorIndex + 1
104+
for i := uint64(0); i < n; i++ {
105+
if uint64(f.sm.GetPhysicalIndex(i+firstSectorIndex)) != uint64(firstSector)+i {
106+
break
107+
}
108+
nContiguous = int(i + 1)
107109
}
108110
}
109111
return firstSector, nContiguous
@@ -129,72 +131,70 @@ func (f *blockDeviceBackedFile) GetNextRegionOffset(off int64, regionType filesy
129131
}
130132

131133
sectorSizeBytes := int64(f.fp.sectorSizeBytes)
134+
sectorIndex := uint64(off / sectorSizeBytes)
132135
switch regionType {
133136
case filesystem.Data:
134-
sectorIndex := int(off / sectorSizeBytes)
135-
if sectorIndex >= len(f.sectors) {
136-
// Inside the hole at the end of the file.
137-
return f.holeSource.GetNextRegionOffset(off, filesystem.Data)
138-
}
139-
if f.sectors[sectorIndex] != 0 {
137+
if f.sm.GetPhysicalIndex(sectorIndex) != 0 {
140138
// Already inside a sector containing data.
141139
return off, nil
142140
}
143141

144142
// Find the next sector containing data.
145-
sectorIndex++
146-
for f.sectors[sectorIndex] == 0 {
147-
sectorIndex++
143+
sectorIndex, err := f.sm.GetNextMappedSector(sectorIndex)
144+
if err == io.EOF {
145+
// No more mapped sectors. Defer to holeSource.
146+
return f.holeSource.GetNextRegionOffset(off, filesystem.Data)
147+
} else if err != nil {
148+
return 0, status.Errorf(codes.Internal, "Failed to get next mapped sector: %v", err)
148149
}
149150
sectorOffsetBytes := int64(sectorIndex) * sectorSizeBytes
150151

151152
// Also consider data provided by the hole source.
152153
holeSourceOffsetBytes, err := f.holeSource.GetNextRegionOffset(off, filesystem.Data)
153-
if err != nil {
154-
if err == io.EOF {
155-
return sectorOffsetBytes, nil
156-
}
157-
return 0, err
154+
if err == io.EOF {
155+
return sectorOffsetBytes, nil
156+
} else if err != nil {
157+
return 0, status.Errorf(codes.Internal, "Failed to get next mapped region from hole source: %v", err)
158158
}
159159
return min(sectorOffsetBytes, holeSourceOffsetBytes), nil
160160
case filesystem.Hole:
161161
for {
162-
// Progress to the next hole in the file.
163-
sectorIndex := int(off / sectorSizeBytes)
164-
if sectorIndex < len(f.sectors) && f.sectors[sectorIndex] != 0 {
165-
for sectorIndex++; sectorIndex < len(f.sectors); sectorIndex++ {
166-
if f.sectors[sectorIndex] == 0 {
167-
break
168-
}
169-
}
170-
off = int64(sectorIndex) * sectorSizeBytes
171-
}
172-
if uint64(off) >= f.sizeBytes {
162+
sectorIndex := uint64(off / sectorSizeBytes)
163+
index, err := f.sm.GetNextUnmappedSector(sectorIndex)
164+
if err == io.EOF {
165+
// This can not happen with the current
166+
// implementation of SectorMap.
173167
return int64(f.sizeBytes), nil
168+
} else if err != nil {
169+
return 0, status.Errorf(codes.Internal, "Failed to get next unmapped sector: %v", err)
174170
}
175-
176-
// Progress to the next hole in the hole source.
177-
holeSourceOffsetBytes, err := f.holeSource.GetNextRegionOffset(off, filesystem.Hole)
178-
if err != nil {
179-
if err == io.EOF {
180-
return off, nil
181-
}
182-
return 0, err
171+
nextSectorOffsetBytes := max(off, int64(index)*int64(sectorSizeBytes))
172+
nextHoleOffsetBytes, err := f.holeSource.GetNextRegionOffset(off, filesystem.Hole)
173+
if err == io.EOF {
174+
// Hole source has given up, sector
175+
// offset decides.
176+
return min(int64(f.sizeBytes), nextSectorOffsetBytes), nil
177+
} else if err != nil {
178+
return 0, status.Errorf(codes.Internal, "Failed to get next unmapped region from hole source: %v", err)
179+
}
180+
off = max(nextHoleOffsetBytes, nextSectorOffsetBytes)
181+
if off > int64(f.sizeBytes) {
182+
// Next hole is at end of file.
183+
return int64(f.sizeBytes), nil
183184
}
184-
if holeSourceOffsetBytes < int64(sectorIndex+1)*sectorSizeBytes {
185-
// Found an offset that refers both to a
186-
// hole in the file and one in the hole
187-
// source.
188-
return holeSourceOffsetBytes, nil
185+
if nextHoleOffsetBytes == nextSectorOffsetBytes {
186+
// Both the sector map and the hole
187+
// source agrees that this is a hole.
188+
return off, nil
189189
}
190-
off = holeSourceOffsetBytes
190+
// Continue.
191191
}
192192
default:
193193
panic("Unknown region type")
194194
}
195195
}
196196

197-
func (f *blockDeviceBackedFile) readFromHoleSource(p []byte, sectorIndex, offsetWithinSector int) (int, error) {
197+
func (f *blockDeviceBackedFile) readFromHoleSource(p []byte, sectorIndex uint64, offsetWithinSector int) (int, error) {
198198
n, err := f.holeSource.ReadAt(p, int64(sectorIndex)*int64(f.fp.sectorSizeBytes)+int64(offsetWithinSector))
199199
if err != nil {
200200
return n, err
@@ -209,14 +209,7 @@ func (f *blockDeviceBackedFile) readFromHoleSource(p []byte, sectorIndex, offset
209209
// attempts to read as much data into the output buffer as is possible
210210
// in a single read operation. If the file is fragmented, multiple reads
211211
// are necessary, requiring this function to be called repeatedly.
212-
func (f *blockDeviceBackedFile) readFromSectors(p []byte, sectorIndex, lastSectorIndex, offsetWithinSector int) (int, error) {
213-
if sectorIndex >= len(f.sectors) {
214-
// Attempted to read from a hole located at the
215-
// end of the file. Redirect the read to the hole
216-
// source, which usually produces zeroes.
217-
return f.readFromHoleSource(p, sectorIndex, offsetWithinSector)
218-
}
219-
212+
func (f *blockDeviceBackedFile) readFromSectors(p []byte, sectorIndex, lastSectorIndex uint64, offsetWithinSector int) (int, error) {
220213
sector, sectorsToRead := f.getSectorsContiguous(sectorIndex, lastSectorIndex)
221214
p = f.limitBufferToSectorBoundary(p, sectorsToRead, offsetWithinSector)
222215
if sector == 0 {
@@ -278,18 +271,9 @@ func (f *blockDeviceBackedFile) ReadAt(p []byte, off int64) (int, error) {
278271
}
279272

280273
// truncateSectors truncates a file to a given number of sectors.
281-
func (f *blockDeviceBackedFile) truncateSectors(sectorCount int) {
282-
if len(f.sectors) > sectorCount {
283-
f.fp.sectorAllocator.FreeList(f.sectors[sectorCount:])
284-
f.sectors = f.sectors[:sectorCount]
285-
286-
// Ensure that no hole remains at the end, as that would
287-
// lead to unnecessary fragmentation when growing the
288-
// file again.
289-
for len(f.sectors) > 0 && f.sectors[len(f.sectors)-1] == 0 {
290-
f.sectors = f.sectors[:len(f.sectors)-1]
291-
}
292-
}
274+
func (f *blockDeviceBackedFile) truncateSectors(sectorCount uint64) {
275+
f.sm.FreeSectors(sectorCount, f.fp.sectorAllocator.FreeList)
276+
f.sm.Truncate(sectorCount)
293277
}
294278

295279
func (f *blockDeviceBackedFile) Sync() error {
@@ -303,19 +287,19 @@ func (f *blockDeviceBackedFile) Truncate(size int64) error {
303287
return status.Errorf(codes.InvalidArgument, "Negative truncation size: %d", size)
304288
}
305289

306-
sectorIndex := int(size / int64(f.fp.sectorSizeBytes))
290+
sectorIndex := uint64(size / int64(f.fp.sectorSizeBytes))
307291
offsetWithinSector := int(size % int64(f.fp.sectorSizeBytes))
308292
if offsetWithinSector == 0 {
309293
// Truncating to an exact number of sectors.
310294
f.truncateSectors(sectorIndex)
311295
} else {
312296
// Truncating to partially into a sector.
313-
if uint64(size) < f.sizeBytes && sectorIndex < len(f.sectors) && f.sectors[sectorIndex] != 0 {
297+
sector := f.sm.GetPhysicalIndex(sectorIndex)
298+
if uint64(size) < f.sizeBytes && sector != 0 {
314299
// The file is being shrunk and the new last
315300
// sector is not a hole. Zero the trailing part
316301
// of the last sector to ensure that growing the
317302
// file later on doesn't bring back old data.
318-
sector := f.sectors[sectorIndex]
319303
zeroes := f.fp.zeroSector[:f.fp.sectorSizeBytes-offsetWithinSector]
320304
if diff := f.sizeBytes - uint64(size); uint64(len(zeroes)) > diff {
321305
zeroes = zeroes[:diff]
@@ -343,7 +327,7 @@ func (f *blockDeviceBackedFile) Truncate(size int64) error {
343327
// writeToNewSectors is used to write data into new sectors. This
344328
// function is called when holes in a sparse file are filled up or when
345329
// data is appended to the end of a file.
346-
func (f *blockDeviceBackedFile) writeToNewSectors(p []byte, firstSectorIndex, offsetWithinSector int) (int, uint32, int, error) {
330+
func (f *blockDeviceBackedFile) writeToNewSectors(p []byte, firstSectorIndex uint64, offsetWithinSector int) (int, uint32, int, error) {
347331
// Allocate space to store the data.
348332
sectorsToAllocate := int((uint64(offsetWithinSector) + uint64(len(p)) + uint64(f.fp.sectorSizeBytes) - 1) / uint64(f.fp.sectorSizeBytes))
349333
firstSector, sectorsAllocated, err := f.fp.sectorAllocator.AllocateContiguous(sectorsToAllocate)
@@ -395,7 +379,7 @@ func (f *blockDeviceBackedFile) writeToNewSectors(p []byte, firstSectorIndex, of
395379
}
396380
p = p[fullSectorsSize:]
397381
sector += uint32(fullSectors)
398-
sectorIndex += int(fullSectors)
382+
sectorIndex += uint64(fullSectors)
399383
}
400384

401385
// Write the last sector separately when we need to introduce
@@ -415,38 +399,12 @@ func (f *blockDeviceBackedFile) writeToNewSectors(p []byte, firstSectorIndex, of
415399
return nWritten, firstSector, sectorsAllocated, nil
416400
}
417401

418-
// insertSectorsContiguous inserts a series of contiguous sectors into a
419-
// file. This function is used to update a file after appending data to
420-
// it or filling up a hole in a sparse file.
421-
func (f *blockDeviceBackedFile) insertSectorsContiguous(firstSectorIndex int, firstSector uint32, count int) {
422-
for i := 0; i < count; i++ {
423-
sectorIndex := firstSectorIndex + i
424-
if f.sectors[sectorIndex] != 0 {
425-
panic(fmt.Sprintf("Attempted to replace existing sector at index %d", sectorIndex))
426-
}
427-
f.sectors[sectorIndex] = firstSector + uint32(i)
428-
}
429-
}
430-
431402
// writeToSectors performs a single write against the block device. It
432403
// attempts to write as much data from the input buffer as is possible
433404
// in a single write operation. If the file is fragmented, multiple
434405
// writes are necessary, requiring this function to be called
435406
// repeatedly.
436-
func (f *blockDeviceBackedFile) writeToSectors(p []byte, sectorIndex, lastSectorIndex, offsetWithinSector int) (int, error) {
437-
if sectorIndex >= len(f.sectors) {
438-
// Attempted to write past the end-of-file or within a
439-
// hole located at the end of a sparse file. Allocate
440-
// space and grow the file.
441-
bytesWritten, firstSector, sectorsAllocated, err := f.writeToNewSectors(p, sectorIndex, offsetWithinSector)
442-
if err != nil {
443-
return 0, err
444-
}
445-
f.sectors = append(f.sectors, make([]uint32, sectorIndex+sectorsAllocated-len(f.sectors))...)
446-
f.insertSectorsContiguous(sectorIndex, firstSector, sectorsAllocated)
447-
return bytesWritten, nil
448-
}
449-
407+
func (f *blockDeviceBackedFile) writeToSectors(p []byte, sectorIndex, lastSectorIndex uint64, offsetWithinSector int) (int, error) {
450408
sector, sectorsToWrite := f.getSectorsContiguous(sectorIndex, lastSectorIndex)
451409
p = f.limitBufferToSectorBoundary(p, sectorsToWrite, offsetWithinSector)
452410
if sector == 0 {
@@ -456,7 +414,14 @@ func (f *blockDeviceBackedFile) writeToSectors(p []byte, sectorIndex, lastSector
456414
if err != nil {
457415
return 0, err
458416
}
459-
f.insertSectorsContiguous(sectorIndex, firstSector, sectorsAllocated)
417+
err = f.sm.InsertSectorsContiguous(sectorIndex, firstSector, uint32(sectorsAllocated))
418+
if err != nil {
419+
// There was an error inserting the sector
420+
// mapping, give the sectors back to the sector
421+
// allocator.
422+
f.fp.sectorAllocator.FreeContiguous(firstSector, sectorsAllocated)
423+
return 0, status.Errorf(codes.Internal, "Unable to insert sectors into sector map: %v", err)
424+
}
460425
return bytesWritten, nil
461426
}
462427

0 commit comments

Comments
 (0)