1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.surefire.api.stream;
20
21 import javax.annotation.Nonnegative;
22 import javax.annotation.Nonnull;
23
24 import java.io.EOFException;
25 import java.io.File;
26 import java.io.IOException;
27 import java.nio.ByteBuffer;
28 import java.nio.CharBuffer;
29 import java.nio.channels.ReadableByteChannel;
30 import java.nio.charset.Charset;
31 import java.nio.charset.CharsetDecoder;
32 import java.nio.charset.CoderResult;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.Map;
36
37 import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
38 import org.apache.maven.surefire.api.fork.ForkNodeArguments;
39
40 import static java.lang.Math.max;
41 import static java.lang.Math.min;
42 import static java.nio.charset.CodingErrorAction.REPLACE;
43 import static java.nio.charset.StandardCharsets.US_ASCII;
44 import static java.util.Arrays.copyOf;
45 import static org.apache.maven.surefire.api.booter.Constants.DEFAULT_STREAM_ENCODING;
46 import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.OVERFLOW;
47 import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.UNDERFLOW;
48 import static org.apache.maven.surefire.shared.lang3.StringUtils.isBlank;
49
50
51
52
53
54
55 public abstract class AbstractStreamDecoder<M, MT extends Enum<MT>, ST extends Enum<ST>> implements AutoCloseable {
56 public static final int BUFFER_SIZE = 1024;
57
58 private static final String PRINTABLE_JVM_NATIVE_STREAM = "Listening for transport dt_socket at address:";
59
60 private static final String[] JVM_ERROR_PATTERNS = {
61 "could not create the java virtual machine",
62 "error occurred during initialization",
63 "error:",
64 "could not reserve enough space",
65 "could not allocate",
66 "unable to allocate",
67 "java.lang.module.findexception"
68 };
69
70 private static final byte[] DEFAULT_STREAM_ENCODING_BYTES =
71 DEFAULT_STREAM_ENCODING.name().getBytes(US_ASCII);
72
73 private static final int NO_POSITION = -1;
74 private static final int DELIMITER_LENGTH = 1;
75 private static final int BYTE_LENGTH = 1;
76 private static final int INT_LENGTH = 4;
77 private static final int LONG_LENGTH = 8;
78
79 private final ReadableByteChannel channel;
80 private final ForkNodeArguments arguments;
81 private final Map<Segment, MT> messageTypes;
82 private final ConsoleLogger logger;
83
84 protected AbstractStreamDecoder(
85 @Nonnull ReadableByteChannel channel,
86 @Nonnull ForkNodeArguments arguments,
87 @Nonnull Map<Segment, MT> messageTypes) {
88 this.channel = channel;
89 this.arguments = arguments;
90 this.messageTypes = messageTypes;
91 logger = arguments.getConsoleLogger();
92 }
93
94 public abstract M decode(@Nonnull Memento memento) throws MalformedChannelException, IOException;
95
96 @Nonnull
97 protected abstract byte[] getEncodedMagicNumber();
98
99 @Nonnull
100 protected abstract ST[] nextSegmentType(@Nonnull MT messageType);
101
102 @Nonnull
103 protected abstract M toMessage(@Nonnull MT messageType, @Nonnull Memento memento) throws MalformedFrameException;
104
105 @Nonnull
106 protected final ForkNodeArguments getArguments() {
107 return arguments;
108 }
109
110 protected void debugStream(byte[] array, int position, int remaining) {}
111
112 protected MT readMessageType(@Nonnull Memento memento) throws IOException, MalformedFrameException {
113 byte[] header = getEncodedMagicNumber();
114 int readCount = DELIMITER_LENGTH + header.length + DELIMITER_LENGTH + BYTE_LENGTH + DELIMITER_LENGTH;
115 read(memento, readCount);
116 checkHeader(memento);
117 return messageTypes.get(readSegment(memento));
118 }
119
120 @Nonnull
121 @SuppressWarnings("checkstyle:magicnumber")
122 protected Segment readSegment(@Nonnull Memento memento) throws IOException, MalformedFrameException {
123 int readCount = readByte(memento) & 0xff;
124 read(memento, readCount + DELIMITER_LENGTH);
125 ByteBuffer bb = memento.getByteBuffer();
126 Segment segment = new Segment(bb.array(), bb.arrayOffset() + bb.position(), readCount);
127 bb.position(bb.position() + readCount);
128 checkDelimiter(memento);
129 return segment;
130 }
131
132 @Nonnull
133 @SuppressWarnings("checkstyle:magicnumber")
134 protected Charset readCharset(@Nonnull Memento memento) throws IOException, MalformedFrameException {
135 int length = readByte(memento) & 0xff;
136 read(memento, length + DELIMITER_LENGTH);
137 ByteBuffer bb = memento.getByteBuffer();
138 byte[] array = bb.array();
139 int offset = bb.arrayOffset() + bb.position();
140 bb.position(bb.position() + length);
141 boolean isDefaultEncoding = false;
142 if (length == DEFAULT_STREAM_ENCODING_BYTES.length) {
143 isDefaultEncoding = true;
144 for (int i = 0; i < length; i++) {
145 isDefaultEncoding &= DEFAULT_STREAM_ENCODING_BYTES[i] == array[offset + i];
146 }
147 }
148
149 try {
150 Charset charset = isDefaultEncoding
151 ? DEFAULT_STREAM_ENCODING
152 : Charset.forName(new String(array, offset, length, US_ASCII));
153
154 checkDelimiter(memento);
155 return charset;
156 } catch (IllegalArgumentException e) {
157 throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), bb.position());
158 }
159 }
160
161 protected String readString(@Nonnull Memento memento) throws IOException, MalformedFrameException {
162 (memento.getCharBuffer()).clear();
163 int readCount = readInt(memento);
164 if (readCount < 0) {
165 throw new MalformedFrameException(
166 memento.getLine().getPositionByteBuffer(), (memento.getByteBuffer()).position());
167 }
168 read(memento, readCount + DELIMITER_LENGTH);
169
170 final String string;
171 if (readCount == 0) {
172 string = "";
173 } else if (readCount == 1) {
174 read(memento, 1);
175 byte oneChar = memento.getByteBuffer().get();
176 string = oneChar == 0 ? null : String.valueOf((char) oneChar);
177 } else {
178 string = readString(memento, readCount);
179 }
180 read(memento, 1);
181 checkDelimiter(memento);
182 return string;
183 }
184
185 protected Integer readInteger(@Nonnull Memento memento) throws IOException, MalformedFrameException {
186 read(memento, BYTE_LENGTH);
187 boolean isNullObject = memento.getByteBuffer().get() == 0;
188 if (isNullObject) {
189 read(memento, DELIMITER_LENGTH);
190 checkDelimiter(memento);
191 return null;
192 }
193 return readInt(memento);
194 }
195
196 protected byte readByte(@Nonnull Memento memento) throws IOException, MalformedFrameException {
197 read(memento, BYTE_LENGTH + DELIMITER_LENGTH);
198 byte b = memento.getByteBuffer().get();
199 checkDelimiter(memento);
200 return b;
201 }
202
203 protected int readInt(@Nonnull Memento memento) throws IOException, MalformedFrameException {
204 read(memento, INT_LENGTH + DELIMITER_LENGTH);
205 int i = memento.getByteBuffer().getInt();
206 checkDelimiter(memento);
207 return i;
208 }
209
210 protected Long readLong(@Nonnull Memento memento) throws IOException, MalformedFrameException {
211 read(memento, BYTE_LENGTH);
212 boolean isNullObject = memento.getByteBuffer().get() == 0;
213 if (isNullObject) {
214 read(memento, DELIMITER_LENGTH);
215 checkDelimiter(memento);
216 return null;
217 }
218 return readLongPrivate(memento);
219 }
220
221 protected long readLongPrivate(@Nonnull Memento memento) throws IOException, MalformedFrameException {
222 read(memento, LONG_LENGTH + DELIMITER_LENGTH);
223 long num = memento.getByteBuffer().getLong();
224 checkDelimiter(memento);
225 return num;
226 }
227
228 @SuppressWarnings("checkstyle:magicnumber")
229 protected final void checkDelimiter(Memento memento) throws MalformedFrameException {
230 ByteBuffer bb = memento.bb;
231 if ((0xff & bb.get()) != ':') {
232 throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), bb.position());
233 }
234 }
235
236 protected final void checkHeader(Memento memento) throws MalformedFrameException {
237 ByteBuffer bb = memento.bb;
238
239 checkDelimiter(memento);
240
241 int shift = 0;
242 try {
243 byte[] header = getEncodedMagicNumber();
244 byte[] bbArray = bb.array();
245 for (int start = bb.arrayOffset() + bb.position(), length = header.length; shift < length; shift++) {
246 if (bbArray[shift + start] != header[shift]) {
247 throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), bb.position() + shift);
248 }
249 }
250 } finally {
251 bb.position(bb.position() + shift);
252 }
253
254 checkDelimiter(memento);
255 }
256
257 protected void checkArguments(Memento memento, int expectedDataElements) throws MalformedFrameException {
258 if (memento.getData().size() != expectedDataElements) {
259 throw new MalformedFrameException(
260 memento.getLine().getPositionByteBuffer(), (memento.getByteBuffer()).position());
261 }
262 }
263
264 private String readString(@Nonnull final Memento memento, @Nonnegative final int totalBytes)
265 throws IOException, MalformedFrameException {
266 memento.getDecoder().reset();
267 final CharBuffer output = memento.getCharBuffer();
268 (output).clear();
269 final ByteBuffer input = memento.getByteBuffer();
270 final List<String> strings = new ArrayList<>();
271 int countDecodedBytes = 0;
272 for (boolean endOfInput = false; !endOfInput; ) {
273 final int bytesToRead = totalBytes - countDecodedBytes;
274 read(memento, bytesToRead);
275 int bytesToDecode = min(input.remaining(), bytesToRead);
276 final boolean isLastChunk = bytesToDecode == bytesToRead;
277 endOfInput = countDecodedBytes + bytesToDecode >= totalBytes;
278 do {
279 boolean endOfChunk = output.remaining() >= bytesToRead;
280 boolean endOfOutput = isLastChunk && endOfChunk;
281 int readInputBytes = decodeString(
282 memento.getDecoder(),
283 input,
284 output,
285 bytesToDecode,
286 endOfOutput,
287 memento.getLine().getPositionByteBuffer());
288 bytesToDecode -= readInputBytes;
289 countDecodedBytes += readInputBytes;
290 } while (isLastChunk && bytesToDecode > 0 && output.hasRemaining());
291
292 strings.add((output).flip().toString());
293 (output).clear();
294 }
295
296 memento.getDecoder().reset();
297 (output).clear();
298
299 return toString(strings);
300 }
301
302 private static int decodeString(
303 @Nonnull CharsetDecoder decoder,
304 @Nonnull ByteBuffer input,
305 @Nonnull CharBuffer output,
306 @Nonnegative int bytesToDecode,
307 boolean endOfInput,
308 @Nonnegative int errorStreamFrom)
309 throws MalformedFrameException {
310 int limit = (input).limit();
311 (input).limit((input).position() + bytesToDecode);
312
313 CoderResult result = decoder.decode(input, output, endOfInput);
314 if (result.isError() || result.isMalformed()) {
315 throw new MalformedFrameException(errorStreamFrom, (input).position());
316 }
317
318 int decodedBytes = bytesToDecode - input.remaining();
319 (input).limit(limit);
320 return decodedBytes;
321 }
322
323 private static String toString(List<String> strings) {
324 if (strings.size() == 1) {
325 return strings.get(0);
326 }
327 StringBuilder concatenated = new StringBuilder(strings.size() * BUFFER_SIZE);
328 for (String s : strings) {
329 concatenated.append(s);
330 }
331 return concatenated.toString();
332 }
333
334 private void printCorruptedStream(Memento memento) {
335 ByteBuffer bb = memento.getByteBuffer();
336 if (bb.hasRemaining()) {
337 int bytesToWrite = bb.remaining();
338 memento.getLine().write(bb, bb.position(), bytesToWrite);
339 bb.position(bb.position() + bytesToWrite);
340 }
341 }
342
343
344
345
346
347
348 protected final void printRemainingStream(Memento memento) {
349 printCorruptedStream(memento);
350 memento.getLine().printExistingLine();
351 memento.getLine().clear();
352 }
353
354 public static final class Segment {
355 private final byte[] array;
356 private final int fromIndex;
357 private final int length;
358 private final int hashCode;
359
360 public Segment(byte[] array, int fromIndex, int length) {
361 this.array = array;
362 this.fromIndex = fromIndex;
363 this.length = length;
364
365 int hashCode = 0;
366 int i = fromIndex;
367 for (int loops = length >> 1; loops-- != 0; ) {
368 hashCode = 31 * hashCode + array[i++];
369 hashCode = 31 * hashCode + array[i++];
370 }
371 this.hashCode = i == fromIndex + length ? hashCode : 31 * hashCode + array[i];
372 }
373
374 @Override
375 public int hashCode() {
376 return hashCode;
377 }
378
379 @Override
380 public boolean equals(Object obj) {
381 if (!(obj instanceof Segment)) {
382 return false;
383 }
384
385 Segment that = (Segment) obj;
386 if (that.length != length) {
387 return false;
388 }
389
390 for (int i = 0; i < length; i++) {
391 if (that.array[that.fromIndex + i] != array[fromIndex + i]) {
392 return false;
393 }
394 }
395 return true;
396 }
397 }
398
399 protected @Nonnull StreamReadStatus read(@Nonnull Memento memento, int recommendedCount) throws IOException {
400 ByteBuffer buffer = memento.getByteBuffer();
401 if (buffer.remaining() >= recommendedCount && buffer.limit() != 0) {
402 return OVERFLOW;
403 } else {
404 if (buffer.position() != 0 && recommendedCount > buffer.capacity() - buffer.position()) {
405 (buffer.compact()).flip();
406 memento.getLine().setPositionByteBuffer(0);
407 }
408 int mark = buffer.position();
409 buffer.position(buffer.limit());
410 buffer.limit(min(buffer.position() + recommendedCount, buffer.capacity()));
411 return read(buffer, mark, recommendedCount);
412 }
413 }
414
415 private StreamReadStatus read(ByteBuffer buffer, int oldPosition, int recommendedCount) throws IOException {
416 StreamReadStatus readStatus = null;
417 boolean isEnd = false;
418 try {
419 while (!isEnd && buffer.position() - oldPosition < recommendedCount && buffer.position() < buffer.limit()) {
420 isEnd = channel.read(buffer) == -1;
421 }
422 } finally {
423 buffer.limit(buffer.position());
424 buffer.position(oldPosition);
425 int readBytes = buffer.remaining();
426 boolean readComplete = readBytes >= recommendedCount;
427 if (!isEnd || readComplete) {
428 debugStream(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
429 readStatus = readComplete ? OVERFLOW : UNDERFLOW;
430 }
431 }
432
433 if (readStatus == null) {
434 throw new EOFException();
435 } else {
436 return readStatus;
437 }
438 }
439
440 public final class Memento {
441 private CharsetDecoder currentDecoder;
442 private final CharsetDecoder defaultDecoder;
443 private final BufferedStream line = new BufferedStream(32);
444 private final List<Object> data = new ArrayList<>();
445 private final CharBuffer cb = CharBuffer.allocate(BUFFER_SIZE);
446 private final ByteBuffer bb = ByteBuffer.allocate(BUFFER_SIZE);
447
448 public Memento() {
449 defaultDecoder = DEFAULT_STREAM_ENCODING
450 .newDecoder()
451 .onMalformedInput(REPLACE)
452 .onUnmappableCharacter(REPLACE);
453 bb.limit(0);
454 }
455
456 public void reset() {
457 currentDecoder = null;
458 data.clear();
459 }
460
461 public CharsetDecoder getDecoder() {
462 return currentDecoder == null ? defaultDecoder : currentDecoder;
463 }
464
465 public void setCharset(Charset charset) {
466 if (charset.name().equals(defaultDecoder.charset().name())) {
467 currentDecoder = defaultDecoder;
468 } else {
469 currentDecoder = charset.newDecoder().onMalformedInput(REPLACE).onUnmappableCharacter(REPLACE);
470 }
471 }
472
473 public BufferedStream getLine() {
474 return line;
475 }
476
477 public List<Object> getData() {
478 return data;
479 }
480
481 public <T> T ofDataAt(int indexOfData) {
482
483 return (T) data.get(indexOfData);
484 }
485
486 public CharBuffer getCharBuffer() {
487 return cb;
488 }
489
490 public ByteBuffer getByteBuffer() {
491 return bb;
492 }
493 }
494
495
496
497
498 public final class BufferedStream {
499 private byte[] buffer;
500 private int count;
501 private int positionByteBuffer;
502 private boolean isNewLine;
503
504 BufferedStream(int capacity) {
505 this.buffer = new byte[capacity];
506 }
507
508 public int getPositionByteBuffer() {
509 return positionByteBuffer;
510 }
511
512 public void setPositionByteBuffer(int positionByteBuffer) {
513 this.positionByteBuffer = positionByteBuffer;
514 }
515
516 public void write(ByteBuffer bb, int position, int length) {
517 ensureCapacity(length);
518 byte[] array = bb.array();
519 int pos = bb.arrayOffset() + position;
520 while (length-- > 0) {
521 positionByteBuffer++;
522 byte b = array[pos++];
523 if (b == '\r' || b == '\n') {
524 if (!isNewLine) {
525 printExistingLine();
526 count = 0;
527 }
528 isNewLine = true;
529 } else {
530 buffer[count++] = b;
531 isNewLine = false;
532 }
533 }
534 }
535
536 public void clear() {
537 count = 0;
538 }
539
540 @Override
541 public String toString() {
542 return new String(buffer, 0, count, DEFAULT_STREAM_ENCODING);
543 }
544
545 private boolean isEmpty() {
546 return count == 0;
547 }
548
549 private void ensureCapacity(int addCapacity) {
550 int oldCapacity = buffer.length;
551 int exactCapacity = count + addCapacity;
552 if (exactCapacity < 0) {
553 throw new OutOfMemoryError();
554 }
555
556 if (oldCapacity < exactCapacity) {
557 int newCapacity = oldCapacity << 1;
558 buffer = copyOf(buffer, max(newCapacity, exactCapacity));
559 }
560 }
561
562 void printExistingLine() {
563 if (isEmpty()) {
564 return;
565 }
566
567 String s = toString();
568 if (isBlank(s)) {
569 return;
570 }
571
572 if (s.contains(PRINTABLE_JVM_NATIVE_STREAM)) {
573 if (logger.isDebugEnabled()) {
574 logger.debug(s);
575 } else if (logger.isInfoEnabled()) {
576 logger.info(s);
577 } else {
578
579 System.out.println(s);
580 }
581 } else {
582 if (isJvmError(s)) {
583 logger.error(s);
584 } else if (logger.isDebugEnabled()) {
585 logger.debug(s);
586 }
587
588 String msg = "Corrupted channel by directly writing to native stream in forked JVM "
589 + arguments.getForkChannelId() + ".";
590 File dumpFile = arguments.dumpStreamText(msg + " Stream '" + s + "'.");
591 String dumpPath = dumpFile.getAbsolutePath();
592 arguments.logWarningAtEnd(msg + " See FAQ web page and the dump file " + dumpPath);
593 }
594 }
595
596 private boolean isJvmError(String line) {
597 String lineLower = line.toLowerCase();
598 for (String errorPattern : JVM_ERROR_PATTERNS) {
599 if (lineLower.contains(errorPattern)) {
600 return true;
601 }
602 }
603 return false;
604 }
605 }
606
607 public static final class MalformedFrameException extends Exception {
608 private final int readFrom;
609 private final int readTo;
610
611 public MalformedFrameException(int readFrom, int readTo) {
612 this.readFrom = readFrom;
613 this.readTo = readTo;
614 }
615
616 public int readFrom() {
617 return readFrom;
618 }
619
620 public int readTo() {
621 return readTo;
622 }
623
624 public boolean hasValidPositions() {
625 return readFrom != NO_POSITION && readTo != NO_POSITION && readTo - readFrom > 0;
626 }
627 }
628
629
630
631
632
633
634
635
636 public enum StreamReadStatus {
637 UNDERFLOW,
638 OVERFLOW,
639 EOF
640 }
641 }