1 package org.apache.maven.plugin.surefire.booterclient.lazytestprovider;
2
3 /*
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
19 * under the License.
20 */
21
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.util.Queue;
25 import java.util.concurrent.Semaphore;
26
27 /**
28 * An {@link InputStream} that, when read, provides test class names out of a queue.
29 * <p/>
30 * The Stream provides only one test at a time, but only after {@link #provideNewTest()} has been invoked.
31 * <p/>
32 * After providing each test class name, followed by a newline character, a flush is performed on the
33 * {@link FlushReceiver} provided by the {@link FlushReceiverProvider} that can be set using
34 * {@link #setFlushReceiverProvider(FlushReceiverProvider)}.
35 *
36 * @author Andreas Gudian
37 */
38 public class TestProvidingInputStream
39 extends InputStream
40 {
41 private final Queue<String> testItemQueue;
42
43 private byte[] currentBuffer;
44
45 private int currentPos;
46
47 private Semaphore semaphore = new Semaphore( 0 );
48
49 private FlushReceiverProvider flushReceiverProvider;
50
51 private boolean closed = false;
52
53 /**
54 * C'tor
55 *
56 * @param testItemQueue source of the tests to be read from this stream
57 */
58 public TestProvidingInputStream( Queue<String> testItemQueue )
59 {
60 this.testItemQueue = testItemQueue;
61 }
62
63 /**
64 * @param flushReceiverProvider the provider for a flush receiver.
65 */
66 public void setFlushReceiverProvider( FlushReceiverProvider flushReceiverProvider )
67 {
68 this.flushReceiverProvider = flushReceiverProvider;
69 }
70
71 @SuppressWarnings( "checkstyle:magicnumber" )
72 @Override
73 public synchronized int read()
74 throws IOException
75 {
76 if ( null == currentBuffer )
77 {
78 if ( null != flushReceiverProvider && null != flushReceiverProvider.getFlushReceiver() )
79 {
80 flushReceiverProvider.getFlushReceiver().flush();
81 }
82
83 semaphore.acquireUninterruptibly();
84
85 if ( closed )
86 {
87 return -1;
88 }
89
90 String currentElement = testItemQueue.poll();
91 if ( null != currentElement )
92 {
93 currentBuffer = currentElement.getBytes();
94 currentPos = 0;
95 }
96 else
97 {
98 return -1;
99 }
100 }
101
102 if ( currentPos < currentBuffer.length )
103 {
104 return ( currentBuffer[currentPos++] & 0xff );
105 }
106 else
107 {
108 currentBuffer = null;
109 return ( '\n' & 0xff );
110 }
111 }
112
113 /**
114 * Signal that a new test is to be provided.
115 */
116 public void provideNewTest()
117 {
118 semaphore.release();
119 }
120
121 @Override
122 public void close()
123 {
124 closed = true;
125 semaphore.release();
126 }
127 }