1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.plugin.surefire.booterclient.output;
20
21 import javax.annotation.Nonnull;
22
23 import java.util.concurrent.Callable;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.FutureTask;
26
27 import org.apache.maven.plugin.surefire.booterclient.output.ThreadedStreamConsumer.QueueSynchronizer;
28 import org.apache.maven.surefire.api.event.Event;
29 import org.apache.maven.surefire.api.event.StandardStreamOutWithNewLineEvent;
30 import org.apache.maven.surefire.extensions.EventHandler;
31 import org.junit.Test;
32
33 import static java.util.concurrent.TimeUnit.SECONDS;
34 import static org.apache.maven.surefire.api.report.RunMode.NORMAL_RUN;
35 import static org.assertj.core.api.Assertions.assertThat;
36
37
38
39
40 @SuppressWarnings("checkstyle:magicnumber")
41 public class ThreadedStreamConsumerTest {
42 @Test
43 public void testQueueSynchronizer() throws Exception {
44 final CountDownLatch countDown = new CountDownLatch(5_000_000);
45 final QueueSynchronizer<Integer> sync = new QueueSynchronizer<>(8 * 1024, null);
46
47 Thread t = new Thread() {
48 @Override
49 public void run() {
50 while (true) {
51 try {
52 sync.awaitNext();
53 countDown.countDown();
54 } catch (InterruptedException e) {
55 throw new IllegalStateException(e);
56 }
57 }
58 }
59 };
60 t.setDaemon(true);
61 t.start();
62
63 SECONDS.sleep(1);
64 System.gc();
65 SECONDS.sleep(2);
66
67 long t1 = System.currentTimeMillis();
68
69 for (int i = 0; i < 5_000_000; i++) {
70 sync.pushNext(i);
71 }
72
73 assertThat(countDown.await(3L, SECONDS)).isTrue();
74
75 long t2 = System.currentTimeMillis();
76 System.out.println((t2 - t1) + " millis in testQueueSynchronizer()");
77 }
78
79 @Test
80 public void testThreadedStreamConsumer() throws Exception {
81 final CountDownLatch countDown = new CountDownLatch(5_000_000);
82 EventHandler<Event> handler = new EventHandler<Event>() {
83 @Override
84 public void handleEvent(@Nonnull Event event) {
85 countDown.countDown();
86 }
87 };
88
89 ThreadedStreamConsumer streamConsumer = new ThreadedStreamConsumer(handler);
90
91 SECONDS.sleep(1);
92 System.gc();
93 SECONDS.sleep(2);
94
95 long t1 = System.currentTimeMillis();
96
97 Event event = new StandardStreamOutWithNewLineEvent(NORMAL_RUN, 1L, "");
98 for (int i = 0; i < 5_000_000; i++) {
99 streamConsumer.handleEvent(event);
100 }
101
102 assertThat(countDown.await(3L, SECONDS)).isTrue();
103
104 long t2 = System.currentTimeMillis();
105 System.out.println((t2 - t1) + " millis in testThreadedStreamConsumer()");
106
107 streamConsumer.close();
108 }
109
110 @Test
111 public void testBasicStatus() throws Exception {
112 final QueueSynchronizer<String> sync = new QueueSynchronizer<>(2, null);
113 sync.pushNext("1");
114 sync.pushNext("2");
115 String s1 = sync.awaitNext();
116 String s2 = sync.awaitNext();
117 assertThat(s1).isEqualTo("1");
118 assertThat(s2).isEqualTo("2");
119 FutureTask<Void> future = new FutureTask<>(new Callable<Void>() {
120 @Override
121 public Void call() throws Exception {
122 sync.awaitNext();
123 return null;
124 }
125 });
126 Thread t = new Thread(future);
127 t.setDaemon(true);
128 t.start();
129 SECONDS.sleep(3L);
130 assertThat(t.getState()).isEqualTo(Thread.State.WAITING);
131 }
132 }