View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.plugin.surefire.booterclient.output;
20  
21  import javax.annotation.Nonnull;
22  
23  import java.util.concurrent.Callable;
24  import java.util.concurrent.CountDownLatch;
25  import java.util.concurrent.FutureTask;
26  
27  import org.apache.maven.plugin.surefire.booterclient.output.ThreadedStreamConsumer.QueueSynchronizer;
28  import org.apache.maven.surefire.api.event.Event;
29  import org.apache.maven.surefire.api.event.StandardStreamOutWithNewLineEvent;
30  import org.apache.maven.surefire.extensions.EventHandler;
31  import org.junit.Test;
32  
33  import static java.util.concurrent.TimeUnit.SECONDS;
34  import static org.apache.maven.surefire.api.report.RunMode.NORMAL_RUN;
35  import static org.assertj.core.api.Assertions.assertThat;
36  
37  /**
38   *
39   */
40  @SuppressWarnings("checkstyle:magicnumber")
41  public class ThreadedStreamConsumerTest {
42      @Test
43      public void testQueueSynchronizer() throws Exception {
44          final CountDownLatch countDown = new CountDownLatch(5_000_000);
45          final QueueSynchronizer<Integer> sync = new QueueSynchronizer<>(8 * 1024, null);
46  
47          Thread t = new Thread() {
48              @Override
49              public void run() {
50                  while (true) {
51                      try {
52                          sync.awaitNext();
53                          countDown.countDown();
54                      } catch (InterruptedException e) {
55                          throw new IllegalStateException(e);
56                      }
57                  }
58              }
59          };
60          t.setDaemon(true);
61          t.start();
62  
63          SECONDS.sleep(1);
64          System.gc();
65          SECONDS.sleep(2);
66  
67          long t1 = System.currentTimeMillis();
68  
69          for (int i = 0; i < 5_000_000; i++) {
70              sync.pushNext(i);
71          }
72  
73          assertThat(countDown.await(3L, SECONDS)).isTrue();
74  
75          long t2 = System.currentTimeMillis();
76          System.out.println((t2 - t1) + " millis in testQueueSynchronizer()");
77      }
78  
79      @Test
80      public void testThreadedStreamConsumer() throws Exception {
81          final CountDownLatch countDown = new CountDownLatch(5_000_000);
82          EventHandler<Event> handler = new EventHandler<Event>() {
83              @Override
84              public void handleEvent(@Nonnull Event event) {
85                  countDown.countDown();
86              }
87          };
88  
89          ThreadedStreamConsumer streamConsumer = new ThreadedStreamConsumer(handler);
90  
91          SECONDS.sleep(1);
92          System.gc();
93          SECONDS.sleep(2);
94  
95          long t1 = System.currentTimeMillis();
96  
97          Event event = new StandardStreamOutWithNewLineEvent(NORMAL_RUN, 1L, "");
98          for (int i = 0; i < 5_000_000; i++) {
99              streamConsumer.handleEvent(event);
100         }
101 
102         assertThat(countDown.await(3L, SECONDS)).isTrue();
103 
104         long t2 = System.currentTimeMillis();
105         System.out.println((t2 - t1) + " millis in testThreadedStreamConsumer()");
106 
107         streamConsumer.close();
108     }
109 
110     @Test
111     public void testBasicStatus() throws Exception {
112         final QueueSynchronizer<String> sync = new QueueSynchronizer<>(2, null);
113         sync.pushNext("1");
114         sync.pushNext("2");
115         String s1 = sync.awaitNext();
116         String s2 = sync.awaitNext();
117         assertThat(s1).isEqualTo("1");
118         assertThat(s2).isEqualTo("2");
119         FutureTask<Void> future = new FutureTask<>(new Callable<Void>() {
120             @Override
121             public Void call() throws Exception {
122                 sync.awaitNext();
123                 return null;
124             }
125         });
126         Thread t = new Thread(future);
127         t.setDaemon(true);
128         t.start();
129         SECONDS.sleep(3L);
130         assertThat(t.getState()).isEqualTo(Thread.State.WAITING);
131     }
132 }