View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.plugin.surefire.extensions;
20  
21  import javax.annotation.Nonnull;
22  
23  import java.io.Closeable;
24  import java.io.File;
25  import java.io.IOException;
26  import java.net.URI;
27  import java.nio.ByteBuffer;
28  import java.nio.channels.ReadableByteChannel;
29  import java.nio.channels.WritableByteChannel;
30  import java.util.UUID;
31  import java.util.concurrent.Callable;
32  import java.util.concurrent.CountDownLatch;
33  import java.util.concurrent.FutureTask;
34  import java.util.concurrent.atomic.AtomicInteger;
35  import java.util.concurrent.atomic.AtomicLong;
36  
37  import org.apache.maven.plugin.surefire.booterclient.output.ThreadedStreamConsumer;
38  import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
39  import org.apache.maven.plugin.surefire.log.api.NullConsoleLogger;
40  import org.apache.maven.surefire.api.booter.Command;
41  import org.apache.maven.surefire.api.event.Event;
42  import org.apache.maven.surefire.api.fork.ForkNodeArguments;
43  import org.apache.maven.surefire.api.report.OutputReportEntry;
44  import org.apache.maven.surefire.api.report.TestOutputReceiver;
45  import org.apache.maven.surefire.api.report.TestOutputReportEntry;
46  import org.apache.maven.surefire.booter.spi.EventChannelEncoder;
47  import org.apache.maven.surefire.booter.spi.SurefireMasterProcessChannelProcessorFactory;
48  import org.apache.maven.surefire.extensions.CommandReader;
49  import org.apache.maven.surefire.extensions.EventHandler;
50  import org.apache.maven.surefire.extensions.util.CountdownCloseable;
51  import org.junit.Rule;
52  import org.junit.Test;
53  import org.junit.rules.ExpectedException;
54  
55  import static java.util.concurrent.TimeUnit.HOURS;
56  import static java.util.concurrent.TimeUnit.SECONDS;
57  import static org.apache.maven.surefire.api.report.RunMode.NORMAL_RUN;
58  import static org.apache.maven.surefire.api.report.TestOutputReportEntry.stdOutln;
59  import static org.assertj.core.api.Assertions.assertThat;
60  import static org.junit.Assert.fail;
61  
62  /**
63   * Simulates the End To End use case where Maven process and Surefire process communicate using the TCP/IP protocol.
64   */
65  @SuppressWarnings("checkstyle:magicnumber")
66  public class E2ETest {
67      private static final String LONG_STRING =
68              "0123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789";
69  
70      @Rule
71      public final ExpectedException e = ExpectedException.none();
72  
73      @Test
74      public void endToEndTest() throws Exception {
75          ForkNodeArguments arguments = new Arguments(UUID.randomUUID().toString(), 1, new NullConsoleLogger());
76  
77          final SurefireForkChannel server = new SurefireForkChannel(arguments);
78          server.tryConnectToClient();
79  
80          final String connection = server.getForkNodeConnectionString();
81  
82          final SurefireMasterProcessChannelProcessorFactory factory = new SurefireMasterProcessChannelProcessorFactory();
83          factory.connect(connection);
84          final EventChannelEncoder encoder = (EventChannelEncoder) factory.createEncoder(arguments);
85  
86          final CountDownLatch awaitHandlerFinished = new CountDownLatch(2);
87  
88          final AtomicLong readTime = new AtomicLong();
89  
90          final int totalCalls = 400_000; // 400_000; // 1_000_000; // 10_000_000;
91  
92          EventHandler<Event> h = new EventHandler<Event>() {
93              private final AtomicInteger counter = new AtomicInteger();
94              private volatile long t1;
95  
96              @Override
97              public void handleEvent(@Nonnull Event event) {
98                  try {
99                      if (counter.getAndIncrement() == 0) {
100                         t1 = System.currentTimeMillis();
101                     }
102 
103                     long t2 = System.currentTimeMillis();
104                     long spent = t2 - t1;
105 
106                     if (counter.get() == totalCalls - 64 * 1024) {
107                         readTime.set(spent);
108                         System.out.println(spent + "ms on read");
109                         awaitHandlerFinished.countDown();
110                     }
111                 } catch (Exception e) {
112                     e.printStackTrace();
113                 }
114             }
115         };
116 
117         EventHandler<Event> queue = new ThreadedStreamConsumer(h);
118 
119         System.gc();
120 
121         SECONDS.sleep(5L);
122 
123         server.bindEventHandler(queue, new CountdownCloseable(new DummyCloseable(), 1), new DummyReadableChannel());
124         server.bindCommandReader(new DummyCommandReader(), null);
125 
126         Thread t = new Thread() {
127             @Override
128             public void run() {
129                 TestOutputReceiver<OutputReportEntry> target = new TestOutputReceiver() {
130                     @Override
131                     public void writeTestOutput(OutputReportEntry reportEntry) {
132                         encoder.testOutput(stdOutln(reportEntry.getLog()));
133                     }
134                 };
135 
136                 // PrintStream out = System.out;
137                 // PrintStream err = System.err;
138 
139                 // ConsoleOutputCapture.startCapture( target );
140 
141                 try {
142                     long t1 = System.currentTimeMillis();
143                     for (int i = 0; i < totalCalls; i++) {
144                         // System.out.println( LONG_STRING );
145                         encoder.testOutput(new TestOutputReportEntry(stdOutln(LONG_STRING), NORMAL_RUN, 1L));
146                     }
147                     long t2 = System.currentTimeMillis();
148                     long spent = t2 - t1;
149                     // System.setOut( out );
150                     // System.setErr( err );
151                     System.out.println(spent + "ms on write");
152                     awaitHandlerFinished.countDown();
153                 } catch (Exception e) {
154                     e.printStackTrace();
155                 }
156             }
157         };
158         t.setDaemon(true);
159         t.start();
160 
161         assertThat(awaitHandlerFinished.await(30L, SECONDS)).isTrue();
162 
163         factory.close();
164         server.close();
165         // queue.close();
166 
167         // 1.0 seconds while using the encoder/decoder
168         assertThat(readTime.get())
169                 .describedAs("The performance test should assert 1.0s of read time. "
170                         + "The limit 6s guarantees that the read time does not exceed this limit on overloaded CPU.")
171                 .isPositive()
172                 .isLessThanOrEqualTo(6_000L);
173     }
174 
175     @Test(timeout = 10_000L)
176     public void shouldVerifyClient() throws Exception {
177         ForkNodeArguments forkNodeArguments = new Arguments(UUID.randomUUID().toString(), 1, new NullConsoleLogger());
178 
179         try (SurefireForkChannel server = new SurefireForkChannel(forkNodeArguments);
180                 SurefireMasterProcessChannelProcessorFactory client =
181                         new SurefireMasterProcessChannelProcessorFactory()) {
182             FutureTask<String> task = new FutureTask<>(new Callable<String>() {
183                 @Override
184                 public String call() throws Exception {
185                     client.connect(server.getForkNodeConnectionString());
186                     return "client connected";
187                 }
188             });
189 
190             Thread t = new Thread(task);
191             t.setDaemon(true);
192             t.start();
193 
194             assertThat(task.get()).isEqualTo("client connected");
195         }
196     }
197 
198     @Test(timeout = 10_000L)
199     public void shouldNotVerifyClient() throws Exception {
200         ForkNodeArguments forkNodeArguments = new Arguments(UUID.randomUUID().toString(), 1, new NullConsoleLogger());
201 
202         try (SurefireForkChannel server = new SurefireForkChannel(forkNodeArguments);
203                 SurefireMasterProcessChannelProcessorFactory client =
204                         new SurefireMasterProcessChannelProcessorFactory()) {
205             FutureTask<String> task = new FutureTask<>(new Callable<String>() {
206                 @Override
207                 public String call() throws Exception {
208                     URI connectionUri = new URI(server.getForkNodeConnectionString());
209                     client.connect("tcp://" + connectionUri.getHost() + ":" + connectionUri.getPort()
210                             + "?sessionId=6ba7b812-9dad-11d1-80b4-00c04fd430c8");
211                     return "client connected";
212                 }
213             });
214 
215             Thread t = new Thread(task);
216             t.setDaemon(true);
217             t.start();
218 
219             e.expect(InvalidSessionIdException.class);
220             e.expectMessage("The actual sessionId '6ba7b812-9dad-11d1-80b4-00c04fd430c8' does not match '"
221                     + forkNodeArguments.getSessionId() + "'.");
222 
223             server.tryConnectToClient();
224             server.bindCommandReader(new DummyCommandReader(), new DummyWritableByteChannel());
225 
226             server.bindEventHandler(
227                     new DummyEventHandler(),
228                     new CountdownCloseable(new DummyCloseable(), 1),
229                     new DummyReadableChannel());
230 
231             fail(task.get());
232         }
233     }
234 
235     private static class DummyEventHandler<Event> implements EventHandler<Event> {
236         @Override
237         public void handleEvent(@Nonnull Event event) {}
238     }
239 
240     private static class DummyReadableChannel implements ReadableByteChannel {
241         private volatile Thread t;
242 
243         @Override
244         public int read(ByteBuffer dst) throws IOException {
245             try {
246                 t = Thread.currentThread();
247                 HOURS.sleep(1L);
248                 return 0;
249             } catch (InterruptedException e) {
250                 throw new IOException(e.getLocalizedMessage(), e);
251             }
252         }
253 
254         @Override
255         public boolean isOpen() {
256             return true;
257         }
258 
259         @Override
260         public void close() {
261             if (t != null) {
262                 t.interrupt();
263             }
264         }
265     }
266 
267     private static class DummyWritableByteChannel implements WritableByteChannel {
268         private volatile Thread t;
269 
270         @Override
271         public int write(ByteBuffer src) throws IOException {
272             try {
273                 t = Thread.currentThread();
274                 HOURS.sleep(1L);
275                 return 0;
276             } catch (InterruptedException e) {
277                 throw new IOException(e.getLocalizedMessage(), e);
278             }
279         }
280 
281         @Override
282         public boolean isOpen() {
283             return true;
284         }
285 
286         @Override
287         public void close() throws IOException {
288             if (t != null) {
289                 t.interrupt();
290             }
291         }
292     }
293 
294     private static class DummyCommandReader implements CommandReader {
295         private volatile Thread t;
296 
297         @Override
298         public Command readNextCommand() throws IOException {
299             try {
300                 t = Thread.currentThread();
301                 HOURS.sleep(1L);
302                 return null;
303             } catch (InterruptedException e) {
304                 throw new IOException(e.getLocalizedMessage(), e);
305             }
306         }
307 
308         @Override
309         public void close() {
310             if (t != null) {
311                 t.interrupt();
312             }
313         }
314 
315         @Override
316         public boolean isClosed() {
317             return false;
318         }
319     }
320 
321     private static class DummyCloseable implements Closeable {
322         @Override
323         public void close() {}
324     }
325 
326     private static class Arguments implements ForkNodeArguments {
327         private final String sessionId;
328         private final int id;
329         private final ConsoleLogger logger;
330 
331         private Arguments(String sessionId, int id, ConsoleLogger logger) {
332             this.sessionId = sessionId;
333             this.id = id;
334             this.logger = logger;
335         }
336 
337         @Nonnull
338         @Override
339         public String getSessionId() {
340             return sessionId;
341         }
342 
343         @Override
344         public int getForkChannelId() {
345             return id;
346         }
347 
348         @Nonnull
349         @Override
350         public File dumpStreamText(@Nonnull String text) {
351             return new File("");
352         }
353 
354         @Nonnull
355         @Override
356         public File dumpStreamException(@Nonnull Throwable t) {
357             return new File("");
358         }
359 
360         @Override
361         public void logWarningAtEnd(@Nonnull String text) {}
362 
363         @Nonnull
364         @Override
365         public ConsoleLogger getConsoleLogger() {
366             return logger;
367         }
368 
369         @Nonnull
370         @Override
371         public Object getConsoleLock() {
372             return logger;
373         }
374 
375         @Override
376         public File getEventStreamBinaryFile() {
377             return null;
378         }
379 
380         @Override
381         public File getCommandStreamBinaryFile() {
382             return null;
383         }
384     }
385 }