Skip to content

Commit aee15a8

Browse files
committed
refactor(async): refactor and tests
1 parent b9c9c3b commit aee15a8

File tree

5 files changed

+135
-20
lines changed

5 files changed

+135
-20
lines changed

lib/src/async/async_queue.dart

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,30 +22,40 @@ typedef AsyncQueueBlock<T> = Future<T> Function();
2222
/// Serial queue are often used to synchronize access to a specific value or resource to prevent data races to occur.
2323
@internal
2424
class AsyncQueue<T> {
25-
static const _timeout = Duration(seconds: 30);
26-
2725
final _blockS = StreamController<_AsyncQueueEntry<T>>();
28-
final _countS = StreamController<int>(sync: true);
2926
late final DisposeBag _bag;
3027

31-
/// Construct [AsyncQueue].
32-
AsyncQueue(Object key, void Function() onTimeout) {
28+
final _countS = StateSubject<int>(0, sync: true);
29+
30+
/// Construct an [AsyncQueue].
31+
AsyncQueue({
32+
required Object key,
33+
required Duration timeout,
34+
required void Function() onTimeout,
35+
}) {
3336
_bag = DisposeBag(
3437
const <Object>[], '( AsyncQueue ~ $key ~ ${shortHash(this)} )');
3538

3639
_blockS.disposedBy(_bag);
3740
_countS.disposedBy(_bag);
3841

39-
final count$ = _countS.stream
40-
.scan<int?>((acc, value, _) => acc! + value, 0)
41-
.shareValueNotReplay(null);
42-
count$
42+
// when the queue is empty, we wait for a timeout to occur
43+
// and then we call the onTimeout callback.
44+
_countS
4345
.where((count) => count == 0)
44-
.switchMap((_) => Rx.timer<void>(null, _timeout)
45-
.where((_) => count$.value == 0)
46-
.takeUntil(count$.where((count) => count != null && count > 0)))
47-
.listen((_) => onTimeout())
48-
.disposedBy(_bag);
46+
.switchMap((_) => Rx.timer<void>(null, timeout)
47+
.where((_) => _countS.value == 0)
48+
.takeUntil(_countS.where((count) => count > 0)))
49+
.listen((_) {
50+
assert(() {
51+
if (_countS.value != 0) {
52+
throw StateError('AsyncQueue is not empty!');
53+
}
54+
return true;
55+
}());
56+
57+
onTimeout();
58+
}).disposedBy(_bag);
4959

5060
Future<T> executeBlock(_AsyncQueueEntry<T> entry) {
5161
final completer = entry.completer;
@@ -64,7 +74,7 @@ class AsyncQueue<T> {
6474
}).onError<Object>((e, s) {
6575
completer.completeError(e, s);
6676
throw e;
67-
}).whenComplete(() => _countS.add(-1));
77+
}).whenComplete(() => _countS.value = _countS.value - 1);
6878
}
6979

7080
_blockS.stream
@@ -73,7 +83,7 @@ class AsyncQueue<T> {
7383
.disposedBy(_bag);
7484
}
7585

76-
/// Close queue.
86+
/// Close queue, discard all pending entries.
7787
Future<void> dispose() => _bag.dispose();
7888

7989
/// Add block to queue.
@@ -87,7 +97,7 @@ class AsyncQueue<T> {
8797

8898
final completer = Completer<T>.sync();
8999
_blockS.add(_AsyncQueueEntry(completer, block));
90-
_countS.add(1);
100+
_countS.value = _countS.value + 1;
91101
return completer.future;
92102
}
93103
}

lib/src/impl/real_storage.dart

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ class RealRxStorage<Key extends Object, Options,
2020
S extends Storage<Key, Options>> implements RxStorage<Key, Options> {
2121
static const _initialKeyValue = KeyAndValue<Object, Object>(
2222
'rx_storage', 'Petrus Nguyen Thai Hoc <hoc081098@gmail.com>', String);
23+
static const _asyncQueueTimeout = Duration(seconds: 30);
2324

2425
/// Trigger subject
2526
final _keyValuesSubject =
@@ -111,8 +112,9 @@ class RealRxStorage<Key extends Object, Options,
111112
.putIfAbsent(
112113
key,
113114
() => AsyncQueue<Object?>(
114-
key,
115-
() => _writeQueueResources.remove(key)?.dispose(),
115+
key: key,
116+
timeout: _asyncQueueTimeout,
117+
onTimeout: () => _writeQueueResources.remove(key)?.dispose(),
116118
),
117119
)
118120
.enqueue(block)

lib/src/interface/rx_storage.dart

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,12 @@ abstract class RxStorage<Key extends Object, Options>
3030
Stream<Map<Key, Object?>> observeAll([Options options]);
3131

3232
/// Clean up resources - Closes the streams.
33+
///
3334
/// This method should be called when a [RxStorage] is no longer needed.
34-
/// Once `dispose` is called, all streams will `not` emit changed value when value changed.
35+
/// But in a real application, this method is rarely called.
36+
///
37+
/// Once `dispose` is called:
38+
/// - All streams will **not** emit changed value when value changed.
39+
/// - All pending writing tasks will be discarded.
3540
Future<void> dispose();
3641
}

test/async/async_queue_test.dart

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import 'dart:async';
2+
import 'dart:math';
3+
4+
import 'package:rx_storage/src/async/async_queue.dart';
5+
import 'package:rxdart_ext/rxdart_ext.dart';
6+
import 'package:test/test.dart';
7+
8+
void main() {
9+
group('AsyncQueue', () {
10+
const timeout = Duration(milliseconds: 500);
11+
final extraTimeout = (timeout * 1.1).inMilliseconds;
12+
final halfTimeout = (timeout * 0.5).inMilliseconds;
13+
14+
test('execute in serial order', () async {
15+
late AsyncQueue<int> queue;
16+
17+
final running = <int>[];
18+
final ran = <int>[];
19+
var simultaneous = 0;
20+
var timeoutOccurred = false;
21+
22+
queue = AsyncQueue<int>(
23+
key: 'key',
24+
onTimeout: () {
25+
queue.dispose();
26+
timeoutOccurred = true;
27+
},
28+
timeout: timeout,
29+
);
30+
31+
final tasks = <Future<void>>[];
32+
for (var i = 0; i < 1000; i++) {
33+
tasks.add(
34+
queue.enqueue(() async {
35+
running.add(i);
36+
simultaneous = max(simultaneous, running.length);
37+
38+
await delay(1);
39+
40+
ran.add(i);
41+
running.remove(i);
42+
return i;
43+
}),
44+
);
45+
}
46+
47+
await Future.wait(tasks);
48+
expect(simultaneous, lessThanOrEqualTo(1));
49+
expect(ran, [...ran]..sort());
50+
51+
await delay(extraTimeout);
52+
expect(timeoutOccurred, isTrue);
53+
});
54+
55+
test('cannot enqueue task after disposed', () async {
56+
final queue = AsyncQueue<int>(
57+
key: 'key',
58+
onTimeout: () {},
59+
timeout: timeout,
60+
);
61+
62+
await pumpEventQueue();
63+
unawaited(queue.dispose());
64+
65+
expect(
66+
() => queue.enqueue(() => Future.value(0)),
67+
throwsA(isStateError),
68+
);
69+
});
70+
71+
test('onTimeout is called when timeout occurred', () async {
72+
var count = 0;
73+
74+
final queue = AsyncQueue<void>(
75+
key: 'key',
76+
onTimeout: () => ++count,
77+
timeout: timeout,
78+
);
79+
80+
await pumpEventQueue();
81+
82+
for (var i = 0; i < 10; i++) {
83+
unawaited(queue.enqueue(() => Future.value()));
84+
await delay(halfTimeout);
85+
expect(count, 0);
86+
}
87+
88+
await delay(extraTimeout);
89+
expect(count, 1);
90+
});
91+
});
92+
}
93+
94+
void unawaited(Future<void>? future) {}

test/rx_storage_test.dart

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import 'async/async_queue_test.dart' as async_queue_test;
12
import 'logger/default_logger_test.dart' as default_logger_test;
23
import 'logger/logger_adapter_test.dart' as logger_adapter_test;
34
import 'model/key_and_value_test.dart' as key_and_value_test;
@@ -8,6 +9,9 @@ import 'storage/streams_test.dart' as streams_test;
89
void main() async {
910
await perf.main();
1011

12+
// async
13+
async_queue_test.main();
14+
1115
// logger tests
1216
default_logger_test.main();
1317
logger_adapter_test.main();

0 commit comments

Comments
 (0)