1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.plugin.surefire.extensions;
20
21 import javax.annotation.Nonnull;
22
23 import java.io.Closeable;
24 import java.io.File;
25 import java.io.IOException;
26 import java.net.URI;
27 import java.nio.ByteBuffer;
28 import java.nio.channels.ReadableByteChannel;
29 import java.nio.channels.WritableByteChannel;
30 import java.util.UUID;
31 import java.util.concurrent.Callable;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.FutureTask;
34 import java.util.concurrent.atomic.AtomicInteger;
35 import java.util.concurrent.atomic.AtomicLong;
36
37 import org.apache.maven.plugin.surefire.booterclient.output.ThreadedStreamConsumer;
38 import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
39 import org.apache.maven.plugin.surefire.log.api.NullConsoleLogger;
40 import org.apache.maven.surefire.api.booter.Command;
41 import org.apache.maven.surefire.api.event.Event;
42 import org.apache.maven.surefire.api.fork.ForkNodeArguments;
43 import org.apache.maven.surefire.api.report.OutputReportEntry;
44 import org.apache.maven.surefire.api.report.TestOutputReceiver;
45 import org.apache.maven.surefire.api.report.TestOutputReportEntry;
46 import org.apache.maven.surefire.booter.spi.EventChannelEncoder;
47 import org.apache.maven.surefire.booter.spi.SurefireMasterProcessChannelProcessorFactory;
48 import org.apache.maven.surefire.extensions.CommandReader;
49 import org.apache.maven.surefire.extensions.EventHandler;
50 import org.apache.maven.surefire.extensions.util.CountdownCloseable;
51 import org.junit.Rule;
52 import org.junit.Test;
53 import org.junit.rules.ExpectedException;
54
55 import static java.util.concurrent.TimeUnit.HOURS;
56 import static java.util.concurrent.TimeUnit.SECONDS;
57 import static org.apache.maven.surefire.api.report.RunMode.NORMAL_RUN;
58 import static org.apache.maven.surefire.api.report.TestOutputReportEntry.stdOutln;
59 import static org.assertj.core.api.Assertions.assertThat;
60 import static org.junit.Assert.fail;
61
62
63
64
65 @SuppressWarnings("checkstyle:magicnumber")
66 public class E2ETest {
67 private static final String LONG_STRING =
68 "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789";
69
70 @Rule
71 public final ExpectedException e = ExpectedException.none();
72
73 @Test
74 public void endToEndTest() throws Exception {
75 ForkNodeArguments arguments = new Arguments(UUID.randomUUID().toString(), 1, new NullConsoleLogger());
76
77 final SurefireForkChannel server = new SurefireForkChannel(arguments);
78 server.tryConnectToClient();
79
80 final String connection = server.getForkNodeConnectionString();
81
82 final SurefireMasterProcessChannelProcessorFactory factory = new SurefireMasterProcessChannelProcessorFactory();
83 factory.connect(connection);
84 final EventChannelEncoder encoder = (EventChannelEncoder) factory.createEncoder(arguments);
85
86 final CountDownLatch awaitHandlerFinished = new CountDownLatch(2);
87
88 final AtomicLong readTime = new AtomicLong();
89
90 final int totalCalls = 400_000;
91
92 EventHandler<Event> h = new EventHandler<Event>() {
93 private final AtomicInteger counter = new AtomicInteger();
94 private volatile long t1;
95
96 @Override
97 public void handleEvent(@Nonnull Event event) {
98 try {
99 if (counter.getAndIncrement() == 0) {
100 t1 = System.currentTimeMillis();
101 }
102
103 long t2 = System.currentTimeMillis();
104 long spent = t2 - t1;
105
106 if (counter.get() == totalCalls - 64 * 1024) {
107 readTime.set(spent);
108 System.out.println("spent " + spent + " ms on read");
109 awaitHandlerFinished.countDown();
110 }
111 } catch (Exception e) {
112 e.printStackTrace();
113 }
114 }
115 };
116
117 EventHandler<Event> queue = new ThreadedStreamConsumer(h);
118
119 System.gc();
120
121 SECONDS.sleep(5L);
122
123 server.bindEventHandler(queue, new CountdownCloseable(new DummyCloseable(), 1), new DummyReadableChannel());
124 server.bindCommandReader(new DummyCommandReader(), null);
125
126 Thread t = new Thread() {
127 @Override
128 public void run() {
129 TestOutputReceiver<OutputReportEntry> target = new TestOutputReceiver() {
130 @Override
131 public void writeTestOutput(OutputReportEntry reportEntry) {
132 encoder.testOutput(stdOutln(reportEntry.getLog()));
133 }
134 };
135
136
137
138
139
140
141 try {
142 long t1 = System.currentTimeMillis();
143 for (int i = 0; i < totalCalls; i++) {
144
145 encoder.testOutput(new TestOutputReportEntry(stdOutln(LONG_STRING), NORMAL_RUN, 1L));
146 }
147 long t2 = System.currentTimeMillis();
148 long spent = t2 - t1;
149
150
151 System.out.println("spent " + spent + " ms on write");
152 awaitHandlerFinished.countDown();
153 } catch (Exception e) {
154 e.printStackTrace();
155 }
156 }
157 };
158 t.setDaemon(true);
159 t.start();
160
161 assertThat(awaitHandlerFinished.await(30L, SECONDS)).isTrue();
162
163 factory.close();
164 server.close();
165
166
167
168 assertThat(readTime.get())
169 .describedAs("The performance test should assert 1.0 s of read time. "
170 + "The limit 10 s guarantees that the read time does not exceed this limit on overloaded CPU.")
171 .isPositive()
172 .isLessThanOrEqualTo(10_000L);
173 }
174
175 @Test(timeout = 10_000L)
176 public void shouldVerifyClient() throws Exception {
177 ForkNodeArguments forkNodeArguments = new Arguments(UUID.randomUUID().toString(), 1, new NullConsoleLogger());
178
179 try (SurefireForkChannel server = new SurefireForkChannel(forkNodeArguments);
180 SurefireMasterProcessChannelProcessorFactory client =
181 new SurefireMasterProcessChannelProcessorFactory()) {
182 FutureTask<String> task = new FutureTask<>(new Callable<String>() {
183 @Override
184 public String call() throws Exception {
185 client.connect(server.getForkNodeConnectionString());
186 return "client connected";
187 }
188 });
189
190 Thread t = new Thread(task);
191 t.setDaemon(true);
192 t.start();
193
194 assertThat(task.get()).isEqualTo("client connected");
195 }
196 }
197
198 @Test(timeout = 10_000L)
199 public void shouldNotVerifyClient() throws Exception {
200 ForkNodeArguments forkNodeArguments = new Arguments(UUID.randomUUID().toString(), 1, new NullConsoleLogger());
201
202 try (SurefireForkChannel server = new SurefireForkChannel(forkNodeArguments);
203 SurefireMasterProcessChannelProcessorFactory client =
204 new SurefireMasterProcessChannelProcessorFactory()) {
205 FutureTask<String> task = new FutureTask<>(new Callable<String>() {
206 @Override
207 public String call() throws Exception {
208 URI connectionUri = new URI(server.getForkNodeConnectionString());
209 client.connect("tcp://" + connectionUri.getHost() + ":" + connectionUri.getPort()
210 + "?sessionId=6ba7b812-9dad-11d1-80b4-00c04fd430c8");
211 return "client connected";
212 }
213 });
214
215 Thread t = new Thread(task);
216 t.setDaemon(true);
217 t.start();
218
219 e.expect(InvalidSessionIdException.class);
220 e.expectMessage("The actual sessionId '6ba7b812-9dad-11d1-80b4-00c04fd430c8' does not match '"
221 + forkNodeArguments.getSessionId() + "'.");
222
223 server.tryConnectToClient();
224 server.bindCommandReader(new DummyCommandReader(), new DummyWritableByteChannel());
225
226 server.bindEventHandler(
227 new DummyEventHandler(),
228 new CountdownCloseable(new DummyCloseable(), 1),
229 new DummyReadableChannel());
230
231 fail(task.get());
232 }
233 }
234
235 private static class DummyEventHandler<Event> implements EventHandler<Event> {
236 @Override
237 public void handleEvent(@Nonnull Event event) {}
238 }
239
240 private static class DummyReadableChannel implements ReadableByteChannel {
241 private volatile Thread t;
242
243 @Override
244 public int read(ByteBuffer dst) throws IOException {
245 try {
246 t = Thread.currentThread();
247 HOURS.sleep(1L);
248 return 0;
249 } catch (InterruptedException e) {
250 throw new IOException(e.getLocalizedMessage(), e);
251 }
252 }
253
254 @Override
255 public boolean isOpen() {
256 return true;
257 }
258
259 @Override
260 public void close() {
261 if (t != null) {
262 t.interrupt();
263 }
264 }
265 }
266
267 private static class DummyWritableByteChannel implements WritableByteChannel {
268 private volatile Thread t;
269
270 @Override
271 public int write(ByteBuffer src) throws IOException {
272 try {
273 t = Thread.currentThread();
274 HOURS.sleep(1L);
275 return 0;
276 } catch (InterruptedException e) {
277 throw new IOException(e.getLocalizedMessage(), e);
278 }
279 }
280
281 @Override
282 public boolean isOpen() {
283 return true;
284 }
285
286 @Override
287 public void close() throws IOException {
288 if (t != null) {
289 t.interrupt();
290 }
291 }
292 }
293
294 private static class DummyCommandReader implements CommandReader {
295 private volatile Thread t;
296
297 @Override
298 public Command readNextCommand() throws IOException {
299 try {
300 t = Thread.currentThread();
301 HOURS.sleep(1L);
302 return null;
303 } catch (InterruptedException e) {
304 throw new IOException(e.getLocalizedMessage(), e);
305 }
306 }
307
308 @Override
309 public void close() {
310 if (t != null) {
311 t.interrupt();
312 }
313 }
314
315 @Override
316 public boolean isClosed() {
317 return false;
318 }
319 }
320
321 private static class DummyCloseable implements Closeable {
322 @Override
323 public void close() {}
324 }
325
326 private static class Arguments implements ForkNodeArguments {
327 private final String sessionId;
328 private final int id;
329 private final ConsoleLogger logger;
330
331 private Arguments(String sessionId, int id, ConsoleLogger logger) {
332 this.sessionId = sessionId;
333 this.id = id;
334 this.logger = logger;
335 }
336
337 @Nonnull
338 @Override
339 public String getSessionId() {
340 return sessionId;
341 }
342
343 @Override
344 public int getForkChannelId() {
345 return id;
346 }
347
348 @Nonnull
349 @Override
350 public File dumpStreamText(@Nonnull String text) {
351 return new File("");
352 }
353
354 @Nonnull
355 @Override
356 public File dumpStreamException(@Nonnull Throwable t) {
357 return new File("");
358 }
359
360 @Override
361 public void logWarningAtEnd(@Nonnull String text) {}
362
363 @Nonnull
364 @Override
365 public ConsoleLogger getConsoleLogger() {
366 return logger;
367 }
368
369 @Nonnull
370 @Override
371 public Object getConsoleLock() {
372 return logger;
373 }
374
375 @Override
376 public File getEventStreamBinaryFile() {
377 return null;
378 }
379
380 @Override
381 public File getCommandStreamBinaryFile() {
382 return null;
383 }
384 }
385 }