1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.plugin.surefire.extensions;
20
21 import javax.annotation.Nonnull;
22
23 import java.io.Closeable;
24 import java.io.File;
25 import java.nio.Buffer;
26 import java.nio.ByteBuffer;
27 import java.nio.channels.ReadableByteChannel;
28 import java.util.concurrent.atomic.AtomicInteger;
29
30 import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
31 import org.apache.maven.surefire.api.event.Event;
32 import org.apache.maven.surefire.api.fork.ForkNodeArguments;
33 import org.apache.maven.surefire.extensions.EventHandler;
34 import org.apache.maven.surefire.extensions.util.CountdownCloseable;
35 import org.junit.Test;
36
37 import static java.lang.Math.min;
38 import static java.nio.charset.StandardCharsets.UTF_8;
39 import static java.util.Arrays.copyOfRange;
40 import static java.util.concurrent.TimeUnit.SECONDS;
41 import static org.assertj.core.api.Assertions.assertThat;
42
43
44
45
46 public class EventConsumerThreadTest {
47 @SuppressWarnings("checkstyle:magicnumber")
48 @Test(timeout = 60_000L)
49 public void performanceTest() throws Exception {
50 final long[] staredAt = {0};
51 final long[] finishedAt = {0};
52 final AtomicInteger calls = new AtomicInteger();
53 final int totalCalls = 1_000_000;
54
55 EventHandler<Event> handler = new EventHandler<Event>() {
56 @Override
57 public void handleEvent(@Nonnull Event event) {
58 if (staredAt[0] == 0) {
59 staredAt[0] = System.currentTimeMillis();
60 }
61
62 if (calls.incrementAndGet() == totalCalls) {
63 finishedAt[0] = System.currentTimeMillis();
64 }
65 }
66 };
67
68 final ByteBuffer event = ByteBuffer.allocate(192);
69 event.put(":maven-surefire-event:".getBytes(UTF_8));
70 event.put((byte) 14);
71 event.put(":std-out-stream:".getBytes(UTF_8));
72 event.put((byte) 10);
73 event.put(":normal-run:".getBytes(UTF_8));
74 event.put("\u0001\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0001:".getBytes(UTF_8));
75 event.put((byte) 5);
76 event.put(":UTF-8:".getBytes(UTF_8));
77 event.putInt(100);
78 event.put(
79 ":0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789:"
80 .getBytes(UTF_8));
81
82 ((Buffer) event).flip();
83 byte[] frame = copyOfRange(event.array(), event.arrayOffset(), event.arrayOffset() + event.remaining());
84 ReadableByteChannel channel = new Channel(frame, 100) {
85 private int countRounds;
86
87 @Override
88 public synchronized int read(ByteBuffer dst) {
89 if (countRounds == totalCalls) {
90 return -1;
91 }
92
93 if (remaining() == 0) {
94 countRounds++;
95 i = 0;
96 }
97
98 return super.read(dst);
99 }
100 };
101
102 EventConsumerThread thread = new EventConsumerThread(
103 "t", channel, handler, new CountdownCloseable(new MockCloseable(), 1), new MockForkNodeArguments());
104
105 SECONDS.sleep(2);
106 System.gc();
107 SECONDS.sleep(5);
108
109 System.out.println("Starting the event thread...");
110
111 thread.start();
112 thread.join();
113
114 long execTime = finishedAt[0] - staredAt[0];
115 System.out.println("...executed in " + execTime + " ms");
116
117
118 assertThat(execTime)
119 .describedAs(
120 "The performance test should assert 0.75 s of read time. "
121 + "The limit 3.65 s guarantees that the read time does not exceed this limit on overloaded CPU.")
122 .isPositive()
123 .isLessThanOrEqualTo(3_650L);
124 }
125
126 private static class Channel implements ReadableByteChannel {
127 private final byte[] bytes;
128 private final int chunkSize;
129 protected int i;
130
131 Channel(byte[] bytes, int chunkSize) {
132 this.bytes = bytes;
133 this.chunkSize = chunkSize;
134 }
135
136 @Override
137 public int read(ByteBuffer dst) {
138 if (remaining() == 0) {
139 return -1;
140 } else if (dst.hasRemaining()) {
141 int length = min(min(chunkSize, remaining()), dst.remaining());
142 dst.put(bytes, i, length);
143 i += length;
144 return length;
145 } else {
146 return 0;
147 }
148 }
149
150 protected final int remaining() {
151 return bytes.length - i;
152 }
153
154 @Override
155 public boolean isOpen() {
156 return false;
157 }
158
159 @Override
160 public void close() {}
161 }
162
163 private static class MockCloseable implements Closeable {
164 @Override
165 public void close() {}
166 }
167
168 private static class MockForkNodeArguments implements ForkNodeArguments {
169 @Nonnull
170 @Override
171 public String getSessionId() {
172 return null;
173 }
174
175 @Override
176 public int getForkChannelId() {
177 return 0;
178 }
179
180 @Nonnull
181 @Override
182 public File dumpStreamText(@Nonnull String text) {
183 return null;
184 }
185
186 @Nonnull
187 @Override
188 public File dumpStreamException(@Nonnull Throwable t) {
189 return null;
190 }
191
192 @Override
193 public void logWarningAtEnd(@Nonnull String text) {}
194
195 @Nonnull
196 @Override
197 public ConsoleLogger getConsoleLogger() {
198 return null;
199 }
200
201 @Nonnull
202 @Override
203 public Object getConsoleLock() {
204 return new Object();
205 }
206
207 @Override
208 public File getEventStreamBinaryFile() {
209 return null;
210 }
211
212 @Override
213 public File getCommandStreamBinaryFile() {
214 return null;
215 }
216 }
217 }