Skip to content

Commit 264f7b5

Browse files
committed
fix: libuv async_t issue
1 parent 6120ce8 commit 264f7b5

File tree

6 files changed

+80
-71
lines changed

6 files changed

+80
-71
lines changed

immix

planglib/std/chan.pi

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,32 @@ impl <S>Chan<S> {
7474
condvar_wait(self.condvar, *self.mtx);
7575
}
7676
let s = self.buffer.pop();
77-
self.count = self.count - 1;
7877

7978
if self.count == self.capacity {
8079
condvar_notify(self.condvar);
8180
}
8281

82+
self.count = self.count - 1;
8383
unlock_mutex(*self.mtx);
8484
return s;
8585
}
86+
87+
pub fn recv_all() [S] {
88+
lock_mutex(*self.mtx);
89+
if self.count == self.capacity {
90+
condvar_notify(self.condvar);
91+
}
92+
let ret = [S*self.count as i64;];
93+
94+
for let i = 0; i < self.count as i64; i = i + 1 {
95+
ret[i] = self.buffer.pop();
96+
}
97+
self.count = 0;
98+
99+
100+
unlock_mutex(*self.mtx);
101+
return ret;
102+
}
86103
pub fn len() u64 {
87104

88105
return self.count;

planglib/std/task/reactor.pi

Lines changed: 58 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,10 @@ struct StreamReadEV {
5757
var GLOBAL_REACTOR = new_uv_reactor();
5858

5959

60-
use std::io;
6160

6261
fn timer_cb(timer:*libuv::uv_timer_t) void {
6362
let ev = libuv::get_data_for_handle<libuv::uv_timer_t|TimerEV>(timer);
64-
io::print_s("timer_cb\n");
6563
ev.wake();
66-
io::print_s("timer_cb end\n");
6764
let cb = unsafe_cast<()>(&close_cb);
6865
let handle = unsafe_cast<()>(timer);
6966
libuv::uv_close(handle,cb);
@@ -120,62 +117,66 @@ use std::io;
120117
fn async_cb(async_t:*libuv::uv_async_t) void {
121118
let reactor = libuv::get_data_for_handle<libuv::uv_async_t|UVReactor>(async_t);
122119
let ch = reactor.ch;
123-
let ev = ch.recv();
124-
match ev {
125-
TimerEV(ev) => {
126-
let timer = libuv::new_uv_timer_t();
127-
gc::keep_alive_pinned(timer);
128-
let cb = unsafe_cast<()>(&timer_cb);
129-
gc::keep_alive_pinned(cb);
130-
libuv::uv_timer_init(libuv::uv_default_loop(), timer);
131-
libuv::set_data_for_handle(timer, &ev);
132-
libuv::uv_timer_start(timer, cb, ev.timeout_ms, ev.repeat_ms);
133-
134-
}
135-
TCPListenEV(ev) => {
136-
gc::keep_alive_pinned(ev.handle);
137-
libuv::uv_tcp_init(reactor.loop, ev.handle);
138-
libuv::uv_tcp_bind(ev.handle, &ev.addr, 0 as u32);
139-
libuv::set_data_for_handle(ev.handle, &ev.on_connect);
140-
let cb = unsafe_cast<()>(&conn_cb);
141-
142-
libuv::uv_listen(unsafe_cast<libuv::uv_stream_t>(ev.handle), 128 as i32, cb);
143-
144-
145-
}
146-
StreamReadEV(ev) => {
147-
gc::keep_alive_pinned(ev.handle);
148-
let cb = unsafe_cast<()>(&read_cb);
149-
let alloc_cb = unsafe_cast<()>(&alloc_cb);
150-
libuv::uv_read_start(ev.handle, alloc_cb, cb);
151-
libuv::set_data_for_handle(ev.handle, &ev);
152-
153-
}
154-
ReadStopEV(ev) => {
155-
libuv::uv_read_stop(ev.handle);
156-
}
157-
WriteEV(ev) => {
158-
let req = libuv::new_uv_write_t();
159-
gc::keep_alive_pinned(req);
160-
161-
let raw_slice_p = &ev.buf[0];
162-
gc::pin(raw_slice_p);
163-
let buf = libuv::uv_buf_init(&ev.buf[0], ev.buf.len() as u32);
164-
let buf_p = &buf;
165-
gc::pin(buf_p);
166-
libuv::set_data_for_handle(req, &ev);
167-
168-
let cb = unsafe_cast<()>(&write_cb);
169-
libuv::uv_write(req, ev.handle, &buf, 1 as u32, cb);
170-
libuv::set_data_for_handle(req, &ev);
171-
}
172-
StopEV(ev) => {
173-
libuv::uv_stop(reactor.loop);
174-
libuv::uv_loop_close(GLOBAL_REACTOR.loop);
175-
}
176-
_ => {
120+
let evs = ch.recv_all();
121+
for let i = 0; i < arr_len(evs); i = i + 1 {
122+
let ev = evs[i];
123+
match ev {
124+
TimerEV(ev) => {
125+
let timer = libuv::new_uv_timer_t();
126+
gc::keep_alive_pinned(timer);
127+
let cb = unsafe_cast<()>(&timer_cb);
128+
gc::keep_alive_pinned(cb);
129+
libuv::uv_timer_init(libuv::uv_default_loop(), timer);
130+
libuv::set_data_for_handle(timer, &ev);
131+
libuv::uv_timer_start(timer, cb, ev.timeout_ms, ev.repeat_ms);
132+
133+
}
134+
TCPListenEV(ev) => {
135+
gc::keep_alive_pinned(ev.handle);
136+
libuv::uv_tcp_init(reactor.loop, ev.handle);
137+
libuv::uv_tcp_bind(ev.handle, &ev.addr, 0 as u32);
138+
libuv::set_data_for_handle(ev.handle, &ev.on_connect);
139+
let cb = unsafe_cast<()>(&conn_cb);
140+
141+
libuv::uv_listen(unsafe_cast<libuv::uv_stream_t>(ev.handle), 128 as i32, cb);
142+
143+
144+
}
145+
StreamReadEV(ev) => {
146+
gc::keep_alive_pinned(ev.handle);
147+
let cb = unsafe_cast<()>(&read_cb);
148+
let alloc_cb = unsafe_cast<()>(&alloc_cb);
149+
libuv::uv_read_start(ev.handle, alloc_cb, cb);
150+
libuv::set_data_for_handle(ev.handle, &ev);
151+
152+
}
153+
ReadStopEV(ev) => {
154+
libuv::uv_read_stop(ev.handle);
155+
}
156+
WriteEV(ev) => {
157+
let req = libuv::new_uv_write_t();
158+
gc::keep_alive_pinned(req);
159+
160+
let raw_slice_p = &ev.buf[0];
161+
gc::pin(raw_slice_p);
162+
let buf = libuv::uv_buf_init(&ev.buf[0], ev.buf.len() as u32);
163+
let buf_p = &buf;
164+
gc::pin(buf_p);
165+
libuv::set_data_for_handle(req, &ev);
166+
167+
let cb = unsafe_cast<()>(&write_cb);
168+
libuv::uv_write(req, ev.handle, &buf, 1 as u32, cb);
169+
libuv::set_data_for_handle(req, &ev);
170+
}
171+
StopEV(ev) => {
172+
libuv::uv_stop(reactor.loop);
173+
libuv::uv_loop_close(GLOBAL_REACTOR.loop);
174+
}
175+
_ => {
176+
}
177177
}
178178
}
179+
179180
return;
180181
}
181182

src/ast/node/control.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -984,7 +984,7 @@ impl Node for MatchNode {
984984
.unwrap_or_default()
985985
.get_value();
986986
let value = value.unwrap_or_default();
987-
let ty = value.get_ty();
987+
let ty = get_type_deep(value.get_ty());
988988
let value = value.get_value();
989989
match &*ty.borrow() {
990990
PLType::Struct(_) | PLType::Primitive(_) | PLType::Union(_) => (),

test/main.pi

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ use project1::test::inference;
3333
use project1::test::rand;
3434
use project1::test::_hashtable;
3535
use project1::test::_io;
36-
// use project1::test::future_test;
36+
use project1::test::future_test;
3737
use project1::test::std_test;
3838

3939

@@ -71,7 +71,7 @@ async fn main() Task<()> {
7171
arr::test_arr();
7272
_hashtable::test_hashtable();
7373
_io::test_io();
74-
// future_test::test_future();
74+
future_test::test_future();
7575
std_test::test_std();
7676

7777
await std_test::test_nested_async_closure();

vm/src/mutex/mod.rs

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ fn create_mutex(mutex: *mut *mut OpaqueMutex) -> u64 {
2222
}))
2323
.cast();
2424
fn drop_mutex_f(mutex: *mut u8) {
25-
eprintln!("drop_mutex_f {:p}", mutex);
2625
unsafe {
2726
drop(Box::from_raw(mutex.cast::<MutexContainer>()));
2827
}
@@ -33,13 +32,11 @@ fn create_mutex(mutex: *mut *mut OpaqueMutex) -> u64 {
3332

3433
#[is_runtime]
3534
fn lock_mutex(mutex: *mut OpaqueMutex) -> u64 {
36-
eprintln!("lock_mutex {:p}", mutex);
3735
let container: &MutexContainer = &*mutex.cast();
3836
// immix::thread_stuck_start();
3937
let lock: MutexGuard<'static, _> = mem::transmute(container.mutex.lock().unwrap());
4038
// immix::thread_stuck_end();
4139
container.guard.set(Some(lock));
42-
eprintln!("lock_mutex end {:p}", mutex);
4340
0
4441
}
4542

@@ -69,30 +66,24 @@ fn drop_condvar(cond: *mut Condvar) -> u64 {
6966

7067
#[is_runtime]
7168
fn condvar_wait(cond: *mut Condvar, mutex: *mut OpaqueMutex) -> u64 {
72-
eprintln!("condvar_wait {:p} {:p}", cond, mutex);
7369
let container: &MutexContainer = &*mutex.cast();
7470
let lock = container.guard.replace(None).unwrap();
7571
let cond = unsafe { &*cond };
7672
let lock = cond.wait::<()>(lock).unwrap();
7773
container.guard.set(Some(lock));
78-
eprintln!("condvar_wait end {:p} {:p}", cond, mutex);
7974
0
8075
}
8176

8277
#[is_runtime]
8378
fn condvar_notify(cond: *mut Condvar) -> u64 {
84-
eprintln!("condvar_notify {:p}", cond);
8579
let cond = unsafe { &*cond };
8680
cond.notify_one();
87-
eprintln!("condvar_notify end {:p}", cond);
8881
0
8982
}
9083

9184
#[is_runtime]
9285
fn condvar_notify_all(cond: *mut Condvar) -> u64 {
93-
eprintln!("condvar_notify_all {:p}", cond);
9486
let cond = unsafe { &*cond };
9587
cond.notify_all();
96-
eprintln!("condvar_notify_all end {:p}", cond);
9788
0
9889
}

0 commit comments

Comments
 (0)