Skip to content

Commit d693cf4

Browse files
nixprimegvisor-bot
authored andcommitted
aio: add LinuxQueue
This is used in cl/769787293. PiperOrigin-RevId: 787248248
1 parent db4fdd7 commit d693cf4

File tree

8 files changed

+281
-11
lines changed

8 files changed

+281
-11
lines changed

nogo.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ analyzers:
188188
internal:
189189
exclude:
190190
- ".*_test.go" # Exclude tests.
191+
- pkg/aio/aio_linux_unsafe.go # Special case.
191192
- pkg/eventfd/eventfd_unsafe.go # Special case.
192193
- "pkg/flipcall/.*_unsafe.go" # Special case.
193194
- pkg/gohacks/noescape_unsafe.go # Special case.

pkg/abi/linux/aio.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,26 @@ package linux
1616

1717
import "encoding/binary"
1818

19+
// AIORing is struct aio_ring, from fs/aio.c, without the trailing
20+
// variable-length array.
21+
type AIORing struct {
22+
ID uint32
23+
Nr uint32
24+
Head uint32
25+
Tail uint32
26+
Magic uint32
27+
CompatFeatures uint32
28+
IncompatFeatures uint32
29+
HeaderLength uint32
30+
}
31+
1932
// AIORingSize is sizeof(struct aio_ring).
2033
const AIORingSize = 32
2134

35+
// AIO_RING_MAGIC is fs/aio.c:AIO_RING_MAGIC, the expected value of
36+
// AIORing.Magic.
37+
const AIO_RING_MAGIC = 0xa10a10a1
38+
2239
// I/O commands.
2340
const (
2441
IOCB_CMD_PREAD = 0

pkg/aio/BUILD

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ go_library(
99
name = "aio",
1010
srcs = [
1111
"aio.go",
12+
"aio_linux_unsafe.go",
1213
"aio_unsafe.go",
1314
],
1415
visibility = ["//pkg/sentry:internal"],
1516
deps = [
17+
"//pkg/abi/linux",
1618
"//pkg/gomaxprocs",
1719
"//pkg/sync",
1820
"@org_golang_x_sys//unix:go_default_library",
@@ -22,7 +24,10 @@ go_library(
2224
go_test(
2325
name = "aio_test",
2426
size = "small",
25-
srcs = ["aio_test.go"],
27+
srcs = [
28+
"aio_linux_test.go",
29+
"aio_test.go",
30+
],
2631
library = ":aio",
2732
deps = [
2833
"//pkg/bitmap",

pkg/aio/aio.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
// limitations under the License.
1414

1515
// Package aio provides asynchronous I/O on host file descriptors.
16+
//
17+
// +checkalignedignore
1618
package aio
1719

1820
import (

pkg/aio/aio_linux_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2024 The gVisor Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//go:build linux
16+
// +build linux
17+
18+
package aio
19+
20+
import (
21+
"testing"
22+
)
23+
24+
func TestLinuxQueue(t *testing.T) {
25+
testQueue(t, func(cap int) (Queue, error) {
26+
q, err := NewLinuxQueue(cap)
27+
return q, err
28+
})
29+
}

pkg/aio/aio_linux_unsafe.go

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
// Copyright 2025 The gVisor Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
//go:build linux
16+
// +build linux
17+
18+
package aio
19+
20+
import (
21+
"fmt"
22+
"sync/atomic"
23+
"unsafe"
24+
25+
"golang.org/x/sys/unix"
26+
"gvisor.dev/gvisor/pkg/abi/linux"
27+
)
28+
29+
// LinuxQueue implements Queue using Linux native AIO.
30+
//
31+
// In LinuxQueue, I/O requests (struct iocb) are submitted when Wait() =>
32+
// io_submit(2) is called. In Linux, io_submit(2) sequentially invokes the
33+
// usual *synchronous* entry point into file reads/writes
34+
// (file_operations::read/write_iter), but provides a completion callback in
35+
// the kernel I/O control block (struct kiocb); file read/write implementations
36+
// may recognize this as indicating support for asynchronous I/O
37+
// (!is_sync_kiocb()), start the request, and return -EIOCBQUEUED to indicate
38+
// asynchronous completion. This means that submission of a read/write
39+
// operation will block (without submitting later requests) until the operation
40+
// completes, unless the implementation of that operation specifically supports
41+
// asynchronous. In practice, this seems to mean:
42+
//
43+
// - O_DIRECT reads, subject to the constraints described by the man page notes
44+
// for open(2), will probably work.
45+
//
46+
// - O_DIRECT writes, subject to the same constraints, will probably work if
47+
// disk blocks are already allocated.
48+
type LinuxQueue struct {
49+
ctxID uintptr
50+
iocbs []linux.IOCallback
51+
iocbPtrs []*linux.IOCallback
52+
ioevs []linux.IOEvent
53+
}
54+
55+
// NewLinuxQueue returns a new LinuxQueue with the given capacity.
56+
func NewLinuxQueue(cap int) (*LinuxQueue, error) {
57+
var ctxID uintptr
58+
if _, _, e := unix.Syscall(unix.SYS_IO_SETUP, uintptr(cap), uintptr(unsafe.Pointer(&ctxID)), 0 /* unused */); e != 0 {
59+
return nil, e
60+
}
61+
q := &LinuxQueue{
62+
ctxID: ctxID,
63+
iocbs: make([]linux.IOCallback, cap),
64+
iocbPtrs: make([]*linux.IOCallback, cap),
65+
ioevs: make([]linux.IOEvent, cap),
66+
}
67+
for i := range q.iocbPtrs {
68+
q.iocbPtrs[i] = &q.iocbs[i]
69+
}
70+
q.iocbs = q.iocbs[:0]
71+
return q, nil
72+
}
73+
74+
// Destroy implements Queue.Destroy.
75+
func (q *LinuxQueue) Destroy() {
76+
unix.Syscall(unix.SYS_IO_DESTROY, q.ctxID, 0 /* unused */, 0 /* unused */)
77+
}
78+
79+
// Cap implements Queue.Cap.
80+
func (q *LinuxQueue) Cap() int {
81+
return len(q.ioevs)
82+
}
83+
84+
// Add implements Queue.Add.
85+
func (q *LinuxQueue) Add(r Request) {
86+
iocb := linux.IOCallback{
87+
Data: r.ID,
88+
FD: r.FD,
89+
Buf: uint64(uintptr(r.Buf)),
90+
Bytes: uint64(r.Len),
91+
Offset: r.Off,
92+
}
93+
switch r.Op {
94+
case OpRead:
95+
iocb.OpCode = linux.IOCB_CMD_PREAD
96+
case OpWrite:
97+
iocb.OpCode = linux.IOCB_CMD_PWRITE
98+
case OpReadv:
99+
iocb.OpCode = linux.IOCB_CMD_PREADV
100+
case OpWritev:
101+
iocb.OpCode = linux.IOCB_CMD_PWRITEV
102+
default:
103+
panic(fmt.Sprintf("unknown op %v", r.Op))
104+
}
105+
q.iocbs = append(q.iocbs, iocb)
106+
}
107+
108+
// Wait implements Queue.Wait.
109+
func (q *LinuxQueue) Wait(cs []Completion, minCompletions int) ([]Completion, error) {
110+
if len(q.iocbs) != 0 {
111+
_, _, e := unix.Syscall(unix.SYS_IO_SUBMIT, q.ctxID, uintptr(len(q.iocbs)), uintptr(unsafe.Pointer(unsafe.SliceData(q.iocbPtrs))))
112+
q.iocbs = q.iocbs[:0]
113+
if e != 0 {
114+
return cs, e
115+
}
116+
}
117+
for {
118+
if ring := q.ring(); ring.Magic == linux.AIO_RING_MAGIC {
119+
nr := ring.Nr
120+
head := ring.Head
121+
if head >= nr {
122+
panic(fmt.Sprintf("aio_ring::head (%d) >= aio_ring::nr (%d)", head, nr))
123+
}
124+
origHead := head
125+
tail := atomic.LoadUint32(&ring.Tail)
126+
for head != tail {
127+
ioev := q.ringAt(head)
128+
cs = append(cs, Completion{
129+
ID: ioev.Data,
130+
Result: ioev.Result,
131+
})
132+
head++
133+
if head >= nr {
134+
head = 0
135+
}
136+
minCompletions--
137+
}
138+
if head != origHead {
139+
atomic.StoreUint32(&ring.Head, head)
140+
if minCompletions <= 0 {
141+
return cs, nil
142+
}
143+
}
144+
}
145+
n, _, e := unix.Syscall6(unix.SYS_IO_GETEVENTS, q.ctxID, uintptr(minCompletions), uintptr(len(q.ioevs)), uintptr(unsafe.Pointer(unsafe.SliceData(q.ioevs))), 0 /* timeout */, 0 /* unused */)
146+
if e != 0 {
147+
if e == unix.EINTR {
148+
continue
149+
}
150+
return cs, e
151+
}
152+
for i := range q.ioevs[:n] {
153+
ioev := &q.ioevs[i]
154+
cs = append(cs, Completion{
155+
ID: ioev.Data,
156+
Result: ioev.Result,
157+
})
158+
}
159+
return cs, nil
160+
}
161+
}
162+
163+
func (q *LinuxQueue) ring() *linux.AIORing {
164+
return (*linux.AIORing)(unsafe.Pointer(q.ctxID))
165+
}
166+
167+
// Preconditions: idx < q.ring().Nr.
168+
func (q *LinuxQueue) ringAt(idx uint32) *linux.IOEvent {
169+
return (*linux.IOEvent)(unsafe.Pointer(q.ctxID + uintptr(q.ring().HeaderLength) + uintptr(idx)*unsafe.Sizeof(linux.IOEvent{})))
170+
}

pkg/aio/aio_test.go

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import (
2525
"gvisor.dev/gvisor/pkg/rand"
2626
)
2727

28-
func TestRead(t *testing.T) {
28+
func testRead(t *testing.T, newQueue func(cap int) (Queue, error)) {
2929
// Create a temp file.
3030
testFile, err := os.CreateTemp(t.TempDir(), "aio_test_read")
3131
if err != nil {
@@ -46,7 +46,10 @@ func TestRead(t *testing.T) {
4646
}
4747

4848
// Read data from the file using async reads.
49-
q := NewGoQueue(8)
49+
q, err := newQueue(8)
50+
if err != nil {
51+
t.Fatalf("failed to create Queue: %v", err)
52+
}
5053
defer q.Destroy()
5154
qavail := q.Cap()
5255
off := int64(0)
@@ -82,7 +85,7 @@ func TestRead(t *testing.T) {
8285
}
8386
}
8487

85-
func TestReadv(t *testing.T) {
88+
func testReadv(t *testing.T, newQueue func(cap int) (Queue, error)) {
8689
// Create a temp file.
8790
testFile, err := os.CreateTemp(t.TempDir(), "aio_test_readv")
8891
if err != nil {
@@ -103,7 +106,10 @@ func TestReadv(t *testing.T) {
103106
}
104107

105108
// Read data from the file using async vectored reads.
106-
q := NewGoQueue(8)
109+
q, err := newQueue(8)
110+
if err != nil {
111+
t.Fatalf("failed to create Queue: %v", err)
112+
}
107113
defer q.Destroy()
108114
qavail := q.Cap()
109115
iovecsData := make([][2]unix.Iovec, qavail)
@@ -152,7 +158,7 @@ func TestReadv(t *testing.T) {
152158
}
153159
}
154160

155-
func TestWrite(t *testing.T) {
161+
func testWrite(t *testing.T, newQueue func(cap int) (Queue, error)) {
156162
// Create a temp file.
157163
testFile, err := os.CreateTemp(t.TempDir(), "aio_test_write")
158164
if err != nil {
@@ -168,7 +174,10 @@ func TestWrite(t *testing.T) {
168174
_, _ = rand.Read(data)
169175

170176
// Write data to the file using async writes.
171-
q := NewGoQueue(8)
177+
q, err := newQueue(8)
178+
if err != nil {
179+
t.Fatalf("failed to create Queue: %v", err)
180+
}
172181
defer q.Destroy()
173182
qavail := q.Cap()
174183
off := int64(0)
@@ -209,7 +218,7 @@ func TestWrite(t *testing.T) {
209218
}
210219
}
211220

212-
func TestWritev(t *testing.T) {
221+
func testWritev(t *testing.T, newQueue func(cap int) (Queue, error)) {
213222
// Create a temp file.
214223
testFile, err := os.CreateTemp(t.TempDir(), "aio_test_writev")
215224
if err != nil {
@@ -225,7 +234,10 @@ func TestWritev(t *testing.T) {
225234
_, _ = rand.Read(data)
226235

227236
// Write data to the file using async vectored writes.
228-
q := NewGoQueue(8)
237+
q, err := newQueue(8)
238+
if err != nil {
239+
t.Fatalf("failed to create Queue: %v", err)
240+
}
229241
defer q.Destroy()
230242
qavail := q.Cap()
231243
iovecsData := make([][2]unix.Iovec, qavail)
@@ -278,3 +290,33 @@ func TestWritev(t *testing.T) {
278290
t.Errorf("bytes differ")
279291
}
280292
}
293+
294+
func testQueue(t *testing.T, newQueue func(cap int) (Queue, error)) {
295+
t.Run("Read", func(t *testing.T) {
296+
t.Helper()
297+
t.Parallel()
298+
testRead(t, newQueue)
299+
})
300+
t.Run("Readv", func(t *testing.T) {
301+
t.Helper()
302+
t.Parallel()
303+
testReadv(t, newQueue)
304+
})
305+
t.Run("Write", func(t *testing.T) {
306+
t.Helper()
307+
t.Parallel()
308+
testWrite(t, newQueue)
309+
})
310+
t.Run("Writev", func(t *testing.T) {
311+
t.Helper()
312+
t.Parallel()
313+
testWritev(t, newQueue)
314+
})
315+
}
316+
317+
func TestGoQueue(t *testing.T) {
318+
testQueue(t, func(cap int) (Queue, error) {
319+
q := NewGoQueue(cap)
320+
return q, nil
321+
})
322+
}

0 commit comments

Comments
 (0)