1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.surefire.extensions;
20
21 import javax.annotation.Nonnull;
22
23 import java.io.Closeable;
24 import java.io.File;
25 import java.io.IOException;
26 import java.net.InetAddress;
27 import java.net.Socket;
28 import java.net.URI;
29 import java.nio.ByteBuffer;
30 import java.nio.channels.ReadableByteChannel;
31 import java.util.Queue;
32 import java.util.UUID;
33 import java.util.concurrent.ConcurrentLinkedQueue;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.atomic.AtomicBoolean;
36
37 import org.apache.maven.plugin.surefire.booterclient.MockReporter;
38 import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.TestLessInputStream;
39 import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.TestLessInputStream.TestLessInputStreamBuilder;
40 import org.apache.maven.plugin.surefire.extensions.SurefireForkNodeFactory;
41 import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
42 import org.apache.maven.surefire.api.event.ControlByeEvent;
43 import org.apache.maven.surefire.api.event.Event;
44 import org.apache.maven.surefire.api.fork.ForkNodeArguments;
45 import org.apache.maven.surefire.extensions.util.CountdownCloseable;
46 import org.junit.Test;
47
48 import static java.nio.charset.StandardCharsets.US_ASCII;
49 import static java.util.concurrent.TimeUnit.MILLISECONDS;
50 import static org.assertj.core.api.Assertions.assertThat;
51 import static org.mockito.ArgumentMatchers.any;
52 import static org.mockito.Mockito.mock;
53 import static org.mockito.Mockito.when;
54
55
56
57
58 public class ForkChannelTest {
59 private static final long TESTCASE_TIMEOUT = 30_000L;
60
61 private final AtomicBoolean hasError = new AtomicBoolean();
62
63 @Test(timeout = TESTCASE_TIMEOUT)
64 public void shouldRequestReplyMessagesViaTCP() throws Exception {
65 final MockReporter reporter = new MockReporter();
66 final String sessionId = UUID.randomUUID().toString();
67 ForkNodeArguments forkNodeArguments = new ForkNodeArguments() {
68 @Override
69 public File getEventStreamBinaryFile() {
70 return null;
71 }
72
73 @Override
74 public File getCommandStreamBinaryFile() {
75 return null;
76 }
77
78 @Nonnull
79 @Override
80 public String getSessionId() {
81 return sessionId;
82 }
83
84 @Override
85 public int getForkChannelId() {
86 return 1;
87 }
88
89 @Override
90 @Nonnull
91 public File dumpStreamText(@Nonnull String text) {
92 return new File("");
93 }
94
95 @Nonnull
96 @Override
97 public File dumpStreamException(@Nonnull Throwable t) {
98 return new File("");
99 }
100
101 @Override
102 public void logWarningAtEnd(@Nonnull String text) {}
103
104 @Override
105 @Nonnull
106 public ConsoleLogger getConsoleLogger() {
107 return reporter;
108 }
109
110 @Nonnull
111 @Override
112 public Object getConsoleLock() {
113 return reporter;
114 }
115 };
116
117 ForkNodeFactory factory = new SurefireForkNodeFactory();
118 try (ForkChannel channel = factory.createForkChannel(forkNodeArguments)) {
119 assertThat(channel.getArguments().getForkChannelId()).isEqualTo(1);
120
121 assertThat(channel.getCountdownCloseablePermits()).isEqualTo(3);
122
123 String localHost = InetAddress.getLoopbackAddress().getHostAddress();
124 String connectionString = channel.getForkNodeConnectionString();
125 URI uri = new URI(connectionString);
126 assertThat(uri.getScheme()).isEqualTo("tcp");
127 String uriHost = uri.getHost();
128 if (uriHost.startsWith("[") && uriHost.endsWith("]")) {
129 uriHost = uriHost.substring(1, uriHost.length() - 1);
130 }
131 assertThat(uriHost).isEqualTo(localHost);
132 assertThat(uri.getPort()).isPositive();
133 assertThat(uri.getQuery()).isEqualTo("sessionId=" + sessionId);
134
135 final TestLessInputStreamBuilder builder = new TestLessInputStreamBuilder();
136 TestLessInputStream commandReader = builder.build();
137 final CountDownLatch isCloseableCalled = new CountDownLatch(1);
138 Closeable closeable = new Closeable() {
139 @Override
140 public void close() {
141 isCloseableCalled.countDown();
142 }
143 };
144 CountdownCloseable cc = new CountdownCloseable(closeable, 2);
145 Consumer consumer = new Consumer();
146
147 Client client = new Client(uri.getPort(), sessionId);
148 client.start();
149
150 channel.tryConnectToClient();
151 channel.bindCommandReader(commandReader, null);
152 ReadableByteChannel stdOut = mock(ReadableByteChannel.class);
153 when(stdOut.read(any(ByteBuffer.class))).thenReturn(-1);
154 channel.bindEventHandler(consumer, cc, stdOut);
155
156 commandReader.noop();
157
158 client.join(TESTCASE_TIMEOUT);
159
160 assertThat(hasError.get()).isFalse();
161
162 assertThat(isCloseableCalled.await(TESTCASE_TIMEOUT, MILLISECONDS)).isTrue();
163
164 assertThat(reporter.getEvents())
165 .describedAs("The decoder captured the list of stream errors: "
166 + reporter.getData().toString())
167 .isEmpty();
168
169 assertThat(consumer.lines).hasSize(1);
170
171 assertThat(consumer.lines.element()).isInstanceOf(ControlByeEvent.class);
172 }
173 }
174
175 private static class Consumer implements EventHandler<Event> {
176 final Queue<Event> lines = new ConcurrentLinkedQueue<>();
177
178 @Override
179 public void handleEvent(@Nonnull Event s) {
180 lines.add(s);
181 }
182 }
183
184 private final class Client extends Thread {
185 private final int port;
186 private final String sessionId;
187
188 private Client(int port, String sessionId) {
189 this.port = port;
190 this.sessionId = sessionId;
191 }
192
193 @Override
194 public void run() {
195 try (Socket socket = new Socket(InetAddress.getLoopbackAddress().getHostAddress(), port)) {
196 socket.getOutputStream().write(sessionId.getBytes(US_ASCII));
197 byte[] data = new byte[128];
198 int readLength = socket.getInputStream().read(data);
199 String token = new String(data, 0, readLength, US_ASCII);
200 assertThat(token).isEqualTo(":maven-surefire-command:\u0004:noop:");
201 socket.getOutputStream().write(":maven-surefire-event:\u0003:bye:".getBytes(US_ASCII));
202 } catch (IOException e) {
203 hasError.set(true);
204 e.printStackTrace();
205 throw new IllegalStateException(e);
206 } catch (RuntimeException e) {
207 hasError.set(true);
208 e.printStackTrace();
209 throw e;
210 }
211 }
212 }
213 }