1 package org.apache.maven.plugin.surefire.booterclient.lazytestprovider;
2
3 /*
4 * Licensed to the Apache Software Foundation (ASF) under one
5 * or more contributor license agreements. See the NOTICE file
6 * distributed with this work for additional information
7 * regarding copyright ownership. The ASF licenses this file
8 * to you under the Apache License, Version 2.0 (the
9 * "License"); you may not use this file except in compliance
10 * with the License. You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing,
15 * software distributed under the License is distributed on an
16 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17 * KIND, either express or implied. See the License for the
18 * specific language governing permissions and limitations
19 * under the License.
20 */
21
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.util.Queue;
25 import java.util.concurrent.Semaphore;
26
27 /**
28 * An {@link InputStream} that, when read, provides test class names out of a queue.
29 * <p/>
30 * The Stream provides only one test at a time, but only after {@link #provideNewTest()} has been invoked.
31 * <p/>
32 * After providing each test class name, followed by a newline character, a flush is performed on the
33 * {@link FlushReceiver} provided by the {@link FlushReceiverProvider} that can be set using
34 * {@link #setFlushReceiverProvider(FlushReceiverProvider)}.
35 *
36 * @author Andreas Gudian
37 */
38 public class TestProvidingInputStream
39 extends InputStream
40 {
41 private final Queue<String> testItemQueue;
42
43 private byte[] currentBuffer;
44
45 private int currentPos;
46
47 private Semaphore semaphore = new Semaphore( 0 );
48
49 private FlushReceiverProvider flushReceiverProvider;
50
51 private boolean closed = false;
52
53 /**
54 * C'tor
55 *
56 * @param testItemQueue source of the tests to be read from this stream
57 */
58 public TestProvidingInputStream( Queue<String> testItemQueue )
59 {
60 this.testItemQueue = testItemQueue;
61 }
62
63 /**
64 * @param flushReceiverProvider the provider for a flush receiver.
65 */
66 public void setFlushReceiverProvider( FlushReceiverProvider flushReceiverProvider )
67 {
68 this.flushReceiverProvider = flushReceiverProvider;
69 }
70
71 @Override
72 public synchronized int read()
73 throws IOException
74 {
75 if ( null == currentBuffer )
76 {
77 if ( null != flushReceiverProvider && null != flushReceiverProvider.getFlushReceiver() )
78 {
79 flushReceiverProvider.getFlushReceiver().flush();
80 }
81
82 semaphore.acquireUninterruptibly();
83
84 if ( closed )
85 {
86 return -1;
87 }
88
89 String currentElement = testItemQueue.poll();
90 if ( null != currentElement )
91 {
92 currentBuffer = currentElement.getBytes();
93 currentPos = 0;
94 }
95 else
96 {
97 return -1;
98 }
99 }
100
101 if ( currentPos < currentBuffer.length )
102 {
103 return ( currentBuffer[currentPos++] & 0xff );
104 }
105 else
106 {
107 currentBuffer = null;
108 return ( '\n' & 0xff );
109 }
110 }
111
112 /**
113 * Signal that a new test is to be provided.
114 */
115 public void provideNewTest()
116 {
117 semaphore.release();
118 }
119
120 public void close()
121 {
122 closed = true;
123 semaphore.release();
124 }
125 }