View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.plugin.surefire.booterclient.lazytestprovider;
20  
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.lang.Thread.State;
24  import java.util.ArrayDeque;
25  import java.util.Queue;
26  import java.util.concurrent.Callable;
27  import java.util.concurrent.ConcurrentLinkedQueue;
28  import java.util.concurrent.FutureTask;
29  import java.util.concurrent.TimeUnit;
30  
31  import org.apache.maven.surefire.api.booter.Command;
32  import org.apache.maven.surefire.api.booter.MasterProcessChannelDecoder;
33  import org.apache.maven.surefire.booter.ForkedNodeArg;
34  import org.apache.maven.surefire.booter.spi.CommandChannelDecoder;
35  import org.junit.Test;
36  
37  import static java.nio.channels.Channels.newChannel;
38  import static java.nio.charset.StandardCharsets.UTF_8;
39  import static org.apache.maven.surefire.api.booter.Command.TEST_SET_FINISHED;
40  import static org.apache.maven.surefire.api.booter.MasterProcessCommand.BYE_ACK;
41  import static org.apache.maven.surefire.api.booter.MasterProcessCommand.NOOP;
42  import static org.hamcrest.MatcherAssert.assertThat;
43  import static org.hamcrest.Matchers.is;
44  import static org.hamcrest.Matchers.notNullValue;
45  import static org.hamcrest.Matchers.nullValue;
46  import static org.junit.Assert.assertTrue;
47  import static org.junit.Assert.fail;
48  
49  /**
50   * Asserts that this stream properly reads bytes from queue.
51   *
52   * @author <a href="mailto:tibordigana@apache.org">Tibor Digana (tibor17)</a>
53   * @since 2.19
54   */
55  public class TestProvidingInputStreamTest {
56      private static final int WAIT_LOOPS = 100;
57  
58      @Test
59      public void closedStreamShouldReturnNullAsEndOfStream() throws IOException {
60          Queue<String> commands = new ArrayDeque<>();
61          TestProvidingInputStream is = new TestProvidingInputStream(commands);
62          is.close();
63          assertThat(is.readNextCommand(), is(nullValue()));
64      }
65  
66      @Test
67      public void emptyStreamShouldWaitUntilClosed() throws Exception {
68          Queue<String> commands = new ArrayDeque<>();
69          final TestProvidingInputStream is = new TestProvidingInputStream(commands);
70          final Thread streamThread = Thread.currentThread();
71          FutureTask<State> futureTask = new FutureTask<>(new Callable<State>() {
72              @Override
73              public State call() {
74                  sleep(1000L);
75                  State state = streamThread.getState();
76                  is.close();
77                  return state;
78              }
79          });
80          Thread assertionThread = new Thread(futureTask);
81          assertionThread.start();
82          assertThat(is.readNextCommand(), is(nullValue()));
83          State state = futureTask.get();
84          assertThat(state, is(State.WAITING));
85      }
86  
87      @Test
88      public void finishedTestsetShouldNotBlock() throws IOException {
89          final TestProvidingInputStream is = new TestProvidingInputStream(new ArrayDeque<String>());
90          is.testSetFinished();
91          new Thread(new Runnable() {
92                      @Override
93                      public void run() {
94                          is.provideNewTest();
95                      }
96                  })
97                  .start();
98  
99          for (int i = 0; i < 2; i++) {
100             Command cmd = is.readNextCommand();
101             assertThat(cmd.getData(), is(nullValue()));
102             assertThat(cmd, is(TEST_SET_FINISHED));
103         }
104 
105         boolean emptyStream = isInputStreamEmpty(is);
106 
107         is.close();
108         assertTrue(emptyStream);
109         assertThat(is.readNextCommand(), is(nullValue()));
110     }
111 
112     @Test
113     public void shouldReadTest() throws IOException {
114         Queue<String> commands = new ArrayDeque<>();
115         commands.add("Test");
116         final TestProvidingInputStream is = new TestProvidingInputStream(commands);
117         new Thread(new Runnable() {
118                     @Override
119                     public void run() {
120                         is.provideNewTest();
121                     }
122                 })
123                 .start();
124 
125         Command cmd = is.readNextCommand();
126         assertThat(cmd.getData(), is("Test"));
127 
128         is.close();
129     }
130 
131     @Test
132     public void shouldDecodeTwoCommands() throws IOException {
133         final TestProvidingInputStream pluginIs = new TestProvidingInputStream(new ConcurrentLinkedQueue<String>());
134         InputStream is = new InputStream() {
135             private byte[] buffer;
136             private int idx;
137 
138             @Override
139             public int read() throws IOException {
140                 if (buffer == null) {
141                     idx = 0;
142                     Command cmd = pluginIs.readNextCommand();
143                     if (cmd != null) {
144                         if (cmd.getCommandType() == BYE_ACK) {
145                             buffer = ":maven-surefire-command:\u0007:bye-ack:".getBytes(UTF_8);
146                         } else if (cmd.getCommandType() == NOOP) {
147                             buffer = ":maven-surefire-command:\u0004:noop:".getBytes(UTF_8);
148                         } else {
149                             fail();
150                         }
151                     }
152                 }
153 
154                 if (buffer != null) {
155                     byte b = buffer[idx++];
156                     if (idx == buffer.length) {
157                         buffer = null;
158                         idx = 0;
159                     }
160                     return b;
161                 }
162                 throw new IOException();
163             }
164         };
165         MasterProcessChannelDecoder decoder = new CommandChannelDecoder(newChannel(is), new ForkedNodeArg(1, false));
166         pluginIs.acknowledgeByeEventReceived();
167         pluginIs.noop();
168         Command bye = decoder.decode();
169         assertThat(bye, is(notNullValue()));
170         assertThat(bye.getCommandType(), is(BYE_ACK));
171         Command noop = decoder.decode();
172         assertThat(noop, is(notNullValue()));
173         assertThat(noop.getCommandType(), is(NOOP));
174     }
175 
176     private static void sleep(long millis) {
177         try {
178             TimeUnit.MILLISECONDS.sleep(millis);
179         } catch (InterruptedException e) {
180             // do nothing
181         }
182     }
183 
184     /**
185      * Waiting (max of 20 seconds)
186      * @param is examined stream
187      * @return {@code true} if the {@link InputStream#read()} is waiting for a new byte.
188      */
189     private static boolean isInputStreamEmpty(final TestProvidingInputStream is) {
190         Thread t = new Thread(new Runnable() {
191             @Override
192             public void run() {
193                 try {
194                     is.readNextCommand();
195                 } catch (IOException e) {
196                     Throwable cause = e.getCause();
197                     Throwable err = cause == null ? e : cause;
198                     if (!(err instanceof InterruptedException)) {
199                         System.err.println(err.toString());
200                     }
201                 }
202             }
203         });
204         t.start();
205         State state;
206         int loops = 0;
207         do {
208             sleep(100L);
209             state = t.getState();
210         } while (state == State.NEW && loops++ < WAIT_LOOPS);
211         t.interrupt();
212         return state == State.WAITING || state == State.TIMED_WAITING;
213     }
214 }