summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--otherlibs/systhreads/event.ml18
1 files changed, 15 insertions, 3 deletions
diff --git a/otherlibs/systhreads/event.ml b/otherlibs/systhreads/event.ml
index 7274156bc..4b328486e 100644
--- a/otherlibs/systhreads/event.ml
+++ b/otherlibs/systhreads/event.ml
@@ -28,8 +28,10 @@ type 'a event =
(* Communication channels *)
type 'a channel =
- { writes_pending: 'a communication Queue.t; (* All offers to write on it *)
- reads_pending: 'a communication Queue.t } (* All offers to read from it *)
+ { mutable writes_pending: 'a communication Queue.t;
+ (* All offers to write on it *)
+ mutable reads_pending: 'a communication Queue.t }
+ (* All offers to read from it *)
(* Communication offered *)
and 'a communication =
@@ -125,6 +127,13 @@ let basic_poll genev =
let poll ev =
basic_poll(scramble_array(Array.of_list(flatten_event ev [])))
+(* Remove all communication opportunities already synchronized *)
+
+let cleanup_queue q =
+ let q' = Queue.create() in
+ Queue.iter (fun c -> if !(c.performed) = -1 then Queue.add c q') q;
+ q'
+
(* Event construction *)
let always data =
@@ -156,7 +165,9 @@ let send channel data =
true
with Queue.Empty ->
false);
- suspend = (fun () -> Queue.add wcomm channel.writes_pending);
+ suspend = (fun () ->
+ channel.writes_pending <- cleanup_queue channel.writes_pending;
+ Queue.add wcomm channel.writes_pending);
result = (fun () -> ()) })
let receive channel =
@@ -183,6 +194,7 @@ let receive channel =
with Queue.Empty ->
false);
suspend = (fun () ->
+ channel.reads_pending <- cleanup_queue channel.reads_pending;
Queue.add rcomm channel.reads_pending);
result = (fun () ->
match rcomm.data with