View Javadoc

1   package org.apache.maven.plugin.surefire.booterclient.lazytestprovider;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.IOException;
23  import java.io.InputStream;
24  import java.util.Queue;
25  import java.util.concurrent.Semaphore;
26  
27  /**
28   * An {@link InputStream} that, when read, provides test class names out of a queue.
29   * <p/>
30   * The Stream provides only one test at a time, but only after {@link #provideNewTest()} has been invoked.
31   * <p/>
32   * After providing each test class name, followed by a newline character, a flush is performed on the
33   * {@link FlushReceiver} provided by the {@link FlushReceiverProvider} that can be set using
34   * {@link #setFlushReceiverProvider(FlushReceiverProvider)}.
35   * 
36   * @author Andreas Gudian
37   */
38  public class TestProvidingInputStream
39      extends InputStream
40  {
41      private final Queue<String> testItemQueue;
42  
43      private byte[] currentBuffer;
44  
45      private int currentPos;
46  
47      private Semaphore semaphore = new Semaphore( 0 );
48  
49      private FlushReceiverProvider flushReceiverProvider;
50  
51      private boolean closed = false;
52  
53      /**
54       * C'tor
55       * 
56       * @param testItemQueue source of the tests to be read from this stream
57       */
58      public TestProvidingInputStream( Queue<String> testItemQueue )
59      {
60          this.testItemQueue = testItemQueue;
61      }
62  
63      /**
64       * @param flushReceiverProvider the provider for a flush receiver.
65       */
66      public void setFlushReceiverProvider( FlushReceiverProvider flushReceiverProvider )
67      {
68          this.flushReceiverProvider = flushReceiverProvider;
69      }
70  
71      @Override
72      public synchronized int read()
73          throws IOException
74      {
75          if ( null == currentBuffer )
76          {
77              if ( null != flushReceiverProvider && null != flushReceiverProvider.getFlushReceiver() )
78              {
79                  flushReceiverProvider.getFlushReceiver().flush();
80              }
81  
82              semaphore.acquireUninterruptibly();
83  
84              if ( closed )
85              {
86                  return -1;
87              }
88  
89              String currentElement = testItemQueue.poll();
90              if ( null != currentElement )
91              {
92                  currentBuffer = currentElement.getBytes();
93                  currentPos = 0;
94              }
95              else
96              {
97                  return -1;
98              }
99          }
100 
101         if ( currentPos < currentBuffer.length )
102         {
103             return ( currentBuffer[currentPos++] & 0xff );
104         }
105         else
106         {
107             currentBuffer = null;
108             return ( '\n' & 0xff );
109         }
110     }
111 
112     /**
113      * Signal that a new test is to be provided.
114      */
115     public void provideNewTest()
116     {
117         semaphore.release();
118     }
119 
120     public void close()
121     {
122         closed = true;
123         semaphore.release();
124     }
125 }