1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.surefire.api.stream;
20
21 import javax.annotation.Nonnegative;
22 import javax.annotation.Nonnull;
23
24 import java.io.EOFException;
25 import java.io.File;
26 import java.io.IOException;
27 import java.nio.Buffer;
28 import java.nio.ByteBuffer;
29 import java.nio.CharBuffer;
30 import java.nio.channels.ReadableByteChannel;
31 import java.nio.charset.Charset;
32 import java.nio.charset.CharsetDecoder;
33 import java.nio.charset.CoderResult;
34 import java.util.ArrayList;
35 import java.util.List;
36 import java.util.Map;
37
38 import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
39 import org.apache.maven.surefire.api.fork.ForkNodeArguments;
40
41 import static java.lang.Math.max;
42 import static java.lang.Math.min;
43 import static java.nio.charset.CodingErrorAction.REPLACE;
44 import static java.nio.charset.StandardCharsets.US_ASCII;
45 import static java.util.Arrays.copyOf;
46 import static org.apache.maven.surefire.api.booter.Constants.DEFAULT_STREAM_ENCODING;
47 import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.OVERFLOW;
48 import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.UNDERFLOW;
49 import static org.apache.maven.surefire.shared.lang3.StringUtils.isBlank;
50
51
52
53
54
55
56 public abstract class AbstractStreamDecoder<M, MT extends Enum<MT>, ST extends Enum<ST>> implements AutoCloseable {
57 public static final int BUFFER_SIZE = 1024;
58
59 private static final String PRINTABLE_JVM_NATIVE_STREAM = "Listening for transport dt_socket at address:";
60
61 private static final String[] JVM_ERROR_PATTERNS = {
62 "could not create the java virtual machine",
63 "error occurred during initialization",
64 "error:",
65 "could not reserve enough space",
66 "could not allocate",
67 "unable to allocate",
68 "java.lang.module.findexception"
69 };
70
71 private static final byte[] DEFAULT_STREAM_ENCODING_BYTES =
72 DEFAULT_STREAM_ENCODING.name().getBytes(US_ASCII);
73
74 private static final int NO_POSITION = -1;
75 private static final int DELIMITER_LENGTH = 1;
76 private static final int BYTE_LENGTH = 1;
77 private static final int INT_LENGTH = 4;
78 private static final int LONG_LENGTH = 8;
79
80 private final ReadableByteChannel channel;
81 private final ForkNodeArguments arguments;
82 private final Map<Segment, MT> messageTypes;
83 private final ConsoleLogger logger;
84
85 protected AbstractStreamDecoder(
86 @Nonnull ReadableByteChannel channel,
87 @Nonnull ForkNodeArguments arguments,
88 @Nonnull Map<Segment, MT> messageTypes) {
89 this.channel = channel;
90 this.arguments = arguments;
91 this.messageTypes = messageTypes;
92 logger = arguments.getConsoleLogger();
93 }
94
95 public abstract M decode(@Nonnull Memento memento) throws MalformedChannelException, IOException;
96
97 @Nonnull
98 protected abstract byte[] getEncodedMagicNumber();
99
100 @Nonnull
101 protected abstract ST[] nextSegmentType(@Nonnull MT messageType);
102
103 @Nonnull
104 protected abstract M toMessage(@Nonnull MT messageType, @Nonnull Memento memento) throws MalformedFrameException;
105
106 @Nonnull
107 protected final ForkNodeArguments getArguments() {
108 return arguments;
109 }
110
111 protected void debugStream(byte[] array, int position, int remaining) {}
112
113 protected MT readMessageType(@Nonnull Memento memento) throws IOException, MalformedFrameException {
114 byte[] header = getEncodedMagicNumber();
115
116
117
118
119
120 read(memento, DELIMITER_LENGTH);
121 ByteBuffer bb = memento.getByteBuffer();
122 if ((bb.array()[bb.arrayOffset() + bb.position()] & 0xff) != ':') {
123 checkHeader(memento);
124 }
125 checkDelimiter(memento);
126 for (int i = 0; i < header.length; i++) {
127 read(memento, DELIMITER_LENGTH);
128 bb = memento.getByteBuffer();
129 if ((bb.array()[bb.arrayOffset() + bb.position()] & 0xff) != (header[i] & 0xff)) {
130
131 throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), bb.position() + 1);
132 }
133 bb.position(bb.position() + 1);
134 }
135 read(memento, DELIMITER_LENGTH);
136 checkDelimiter(memento);
137 return messageTypes.get(readSegment(memento));
138 }
139
140 @Nonnull
141 @SuppressWarnings("checkstyle:magicnumber")
142 protected Segment readSegment(@Nonnull Memento memento) throws IOException, MalformedFrameException {
143 int readCount = readByte(memento) & 0xff;
144 read(memento, readCount + DELIMITER_LENGTH);
145 ByteBuffer bb = memento.getByteBuffer();
146 Segment segment = new Segment(bb.array(), bb.arrayOffset() + ((Buffer) bb).position(), readCount);
147 ((Buffer) bb).position(((Buffer) bb).position() + readCount);
148 checkDelimiter(memento);
149 return segment;
150 }
151
152 @Nonnull
153 @SuppressWarnings("checkstyle:magicnumber")
154 protected Charset readCharset(@Nonnull Memento memento) throws IOException, MalformedFrameException {
155 int length = readByte(memento) & 0xff;
156 read(memento, length + DELIMITER_LENGTH);
157 ByteBuffer bb = memento.getByteBuffer();
158 byte[] array = bb.array();
159 int offset = bb.arrayOffset() + ((Buffer) bb).position();
160 ((Buffer) bb).position(((Buffer) bb).position() + length);
161 boolean isDefaultEncoding = false;
162 if (length == DEFAULT_STREAM_ENCODING_BYTES.length) {
163 isDefaultEncoding = true;
164 for (int i = 0; i < length; i++) {
165 isDefaultEncoding &= DEFAULT_STREAM_ENCODING_BYTES[i] == array[offset + i];
166 }
167 }
168
169 try {
170 Charset charset = isDefaultEncoding
171 ? DEFAULT_STREAM_ENCODING
172 : Charset.forName(new String(array, offset, length, US_ASCII));
173
174 checkDelimiter(memento);
175 return charset;
176 } catch (IllegalArgumentException e) {
177 throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), ((Buffer) bb).position());
178 }
179 }
180
181 protected String readString(@Nonnull Memento memento) throws IOException, MalformedFrameException {
182 ((Buffer) memento.getCharBuffer()).clear();
183 int readCount = readInt(memento);
184 if (readCount < 0) {
185 throw new MalformedFrameException(
186 memento.getLine().getPositionByteBuffer(), ((Buffer) memento.getByteBuffer()).position());
187 }
188 read(memento, readCount + DELIMITER_LENGTH);
189
190 final String string;
191 if (readCount == 0) {
192 string = "";
193 } else if (readCount == 1) {
194 read(memento, 1);
195 byte oneChar = memento.getByteBuffer().get();
196 string = oneChar == 0 ? null : String.valueOf((char) oneChar);
197 } else {
198 string = readString(memento, readCount);
199 }
200 read(memento, 1);
201 checkDelimiter(memento);
202 return string;
203 }
204
205 protected Integer readInteger(@Nonnull Memento memento) throws IOException, MalformedFrameException {
206 read(memento, BYTE_LENGTH);
207 boolean isNullObject = memento.getByteBuffer().get() == 0;
208 if (isNullObject) {
209 read(memento, DELIMITER_LENGTH);
210 checkDelimiter(memento);
211 return null;
212 }
213 return readInt(memento);
214 }
215
216 protected byte readByte(@Nonnull Memento memento) throws IOException, MalformedFrameException {
217 read(memento, BYTE_LENGTH + DELIMITER_LENGTH);
218 byte b = memento.getByteBuffer().get();
219 checkDelimiter(memento);
220 return b;
221 }
222
223 protected int readInt(@Nonnull Memento memento) throws IOException, MalformedFrameException {
224 read(memento, INT_LENGTH + DELIMITER_LENGTH);
225 int i = memento.getByteBuffer().getInt();
226 checkDelimiter(memento);
227 return i;
228 }
229
230 protected Long readLong(@Nonnull Memento memento) throws IOException, MalformedFrameException {
231 read(memento, BYTE_LENGTH);
232 boolean isNullObject = memento.getByteBuffer().get() == 0;
233 if (isNullObject) {
234 read(memento, DELIMITER_LENGTH);
235 checkDelimiter(memento);
236 return null;
237 }
238 return readLongPrivate(memento);
239 }
240
241 protected long readLongPrivate(@Nonnull Memento memento) throws IOException, MalformedFrameException {
242 read(memento, LONG_LENGTH + DELIMITER_LENGTH);
243 long num = memento.getByteBuffer().getLong();
244 checkDelimiter(memento);
245 return num;
246 }
247
248 @SuppressWarnings("checkstyle:magicnumber")
249 protected final void checkDelimiter(Memento memento) throws MalformedFrameException {
250 ByteBuffer bb = memento.bb;
251 if ((0xff & bb.get()) != ':') {
252 throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), ((Buffer) bb).position());
253 }
254 }
255
256 protected final void checkHeader(Memento memento) throws MalformedFrameException {
257 ByteBuffer bb = memento.bb;
258
259 checkDelimiter(memento);
260
261 int shift = 0;
262 try {
263 byte[] header = getEncodedMagicNumber();
264 byte[] bbArray = bb.array();
265 for (int start = bb.arrayOffset() + ((Buffer) bb).position(), length = header.length;
266 shift < length;
267 shift++) {
268 if (bbArray[shift + start] != header[shift]) {
269 throw new MalformedFrameException(
270 memento.getLine().getPositionByteBuffer(), ((Buffer) bb).position() + shift);
271 }
272 }
273 } finally {
274 ((Buffer) bb).position(((Buffer) bb).position() + shift);
275 }
276
277 checkDelimiter(memento);
278 }
279
280 protected void checkArguments(Memento memento, int expectedDataElements) throws MalformedFrameException {
281 if (memento.getData().size() != expectedDataElements) {
282 throw new MalformedFrameException(
283 memento.getLine().getPositionByteBuffer(), ((Buffer) memento.getByteBuffer()).position());
284 }
285 }
286
287 private String readString(@Nonnull final Memento memento, @Nonnegative final int totalBytes)
288 throws IOException, MalformedFrameException {
289 memento.getDecoder().reset();
290 final CharBuffer output = memento.getCharBuffer();
291 ((Buffer) output).clear();
292 final ByteBuffer input = memento.getByteBuffer();
293 final List<String> strings = new ArrayList<>();
294 int countDecodedBytes = 0;
295 for (boolean endOfInput = false; !endOfInput; ) {
296 final int bytesToRead = totalBytes - countDecodedBytes;
297 read(memento, bytesToRead);
298 int bytesToDecode = min(input.remaining(), bytesToRead);
299 final boolean isLastChunk = bytesToDecode == bytesToRead;
300 endOfInput = countDecodedBytes + bytesToDecode >= totalBytes;
301 do {
302 boolean endOfChunk = output.remaining() >= bytesToRead;
303 boolean endOfOutput = isLastChunk && endOfChunk;
304 int readInputBytes = decodeString(
305 memento.getDecoder(),
306 input,
307 output,
308 bytesToDecode,
309 endOfOutput,
310 memento.getLine().getPositionByteBuffer());
311 bytesToDecode -= readInputBytes;
312 countDecodedBytes += readInputBytes;
313 } while (isLastChunk && bytesToDecode > 0 && output.hasRemaining());
314
315 strings.add(((Buffer) output).flip().toString());
316 ((Buffer) output).clear();
317 }
318
319 memento.getDecoder().reset();
320 ((Buffer) output).clear();
321
322 return toString(strings);
323 }
324
325 private static int decodeString(
326 @Nonnull CharsetDecoder decoder,
327 @Nonnull ByteBuffer input,
328 @Nonnull CharBuffer output,
329 @Nonnegative int bytesToDecode,
330 boolean endOfInput,
331 @Nonnegative int errorStreamFrom)
332 throws MalformedFrameException {
333 int limit = ((Buffer) input).limit();
334 ((Buffer) input).limit(((Buffer) input).position() + bytesToDecode);
335
336 CoderResult result = decoder.decode(input, output, endOfInput);
337 if (result.isError() || result.isMalformed()) {
338 throw new MalformedFrameException(errorStreamFrom, ((Buffer) input).position());
339 }
340
341 int decodedBytes = bytesToDecode - input.remaining();
342 ((Buffer) input).limit(limit);
343 return decodedBytes;
344 }
345
346 private static String toString(List<String> strings) {
347 if (strings.size() == 1) {
348 return strings.get(0);
349 }
350 StringBuilder concatenated = new StringBuilder(strings.size() * BUFFER_SIZE);
351 for (String s : strings) {
352 concatenated.append(s);
353 }
354 return concatenated.toString();
355 }
356
357 private void printCorruptedStream(Memento memento) {
358 ByteBuffer bb = memento.getByteBuffer();
359 if (bb.hasRemaining()) {
360 int bytesToWrite = bb.remaining();
361 memento.getLine().write(bb, ((Buffer) bb).position(), bytesToWrite);
362 ((Buffer) bb).position(((Buffer) bb).position() + bytesToWrite);
363 }
364 }
365
366
367
368
369
370
371 protected final void printRemainingStream(Memento memento) {
372 printCorruptedStream(memento);
373 memento.getLine().printExistingLine();
374 memento.getLine().clear();
375 }
376
377 public static final class Segment {
378 private final byte[] array;
379 private final int fromIndex;
380 private final int length;
381 private final int hashCode;
382
383 public Segment(byte[] array, int fromIndex, int length) {
384 this.array = array;
385 this.fromIndex = fromIndex;
386 this.length = length;
387
388 int hashCode = 0;
389 int i = fromIndex;
390 for (int loops = length >> 1; loops-- != 0; ) {
391 hashCode = 31 * hashCode + array[i++];
392 hashCode = 31 * hashCode + array[i++];
393 }
394 this.hashCode = i == fromIndex + length ? hashCode : 31 * hashCode + array[i];
395 }
396
397 @Override
398 public int hashCode() {
399 return hashCode;
400 }
401
402 @Override
403 public boolean equals(Object obj) {
404 if (!(obj instanceof Segment)) {
405 return false;
406 }
407
408 Segment that = (Segment) obj;
409 if (that.length != length) {
410 return false;
411 }
412
413 for (int i = 0; i < length; i++) {
414 if (that.array[that.fromIndex + i] != array[fromIndex + i]) {
415 return false;
416 }
417 }
418 return true;
419 }
420 }
421
422 protected @Nonnull StreamReadStatus read(@Nonnull Memento memento, int recommendedCount) throws IOException {
423 ByteBuffer buffer = memento.getByteBuffer();
424 if (buffer.remaining() >= recommendedCount && ((Buffer) buffer).limit() != 0) {
425 return OVERFLOW;
426 } else {
427 if (((Buffer) buffer).position() != 0
428 && recommendedCount > buffer.capacity() - ((Buffer) buffer).position()) {
429 ((Buffer) buffer.compact()).flip();
430 memento.getLine().setPositionByteBuffer(0);
431 }
432 int mark = ((Buffer) buffer).position();
433 ((Buffer) buffer).position(((Buffer) buffer).limit());
434 ((Buffer) buffer).limit(min(((Buffer) buffer).position() + recommendedCount, buffer.capacity()));
435 return read(buffer, mark, recommendedCount);
436 }
437 }
438
439 private StreamReadStatus read(ByteBuffer buffer, int oldPosition, int recommendedCount) throws IOException {
440 StreamReadStatus readStatus = null;
441 boolean isEnd = false;
442 try {
443 while (!isEnd
444 && ((Buffer) buffer).position() - oldPosition < recommendedCount
445 && ((Buffer) buffer).position() < ((Buffer) buffer).limit()) {
446 isEnd = channel.read(buffer) == -1;
447 }
448 } finally {
449 ((Buffer) buffer).limit(((Buffer) buffer).position());
450 ((Buffer) buffer).position(oldPosition);
451 int readBytes = buffer.remaining();
452 boolean readComplete = readBytes >= recommendedCount;
453 if (!isEnd || readComplete) {
454 debugStream(buffer.array(), buffer.arrayOffset() + ((Buffer) buffer).position(), buffer.remaining());
455 readStatus = readComplete ? OVERFLOW : UNDERFLOW;
456 }
457 }
458
459 if (readStatus == null) {
460 throw new EOFException();
461 } else {
462 return readStatus;
463 }
464 }
465
466 public final class Memento {
467 private CharsetDecoder currentDecoder;
468 private final CharsetDecoder defaultDecoder;
469 private final BufferedStream line = new BufferedStream(32);
470 private final List<Object> data = new ArrayList<>();
471 private final CharBuffer cb = CharBuffer.allocate(BUFFER_SIZE);
472 private final ByteBuffer bb = ByteBuffer.allocate(BUFFER_SIZE);
473
474 public Memento() {
475 defaultDecoder = DEFAULT_STREAM_ENCODING
476 .newDecoder()
477 .onMalformedInput(REPLACE)
478 .onUnmappableCharacter(REPLACE);
479 ((Buffer) bb).limit(0);
480 }
481
482 public void reset() {
483 currentDecoder = null;
484 data.clear();
485 }
486
487 public CharsetDecoder getDecoder() {
488 return currentDecoder == null ? defaultDecoder : currentDecoder;
489 }
490
491 public void setCharset(Charset charset) {
492 if (charset.name().equals(defaultDecoder.charset().name())) {
493 currentDecoder = defaultDecoder;
494 } else {
495 currentDecoder = charset.newDecoder().onMalformedInput(REPLACE).onUnmappableCharacter(REPLACE);
496 }
497 }
498
499 public BufferedStream getLine() {
500 return line;
501 }
502
503 public List<Object> getData() {
504 return data;
505 }
506
507 public <T> T ofDataAt(int indexOfData) {
508
509 return (T) data.get(indexOfData);
510 }
511
512 public CharBuffer getCharBuffer() {
513 return cb;
514 }
515
516 public ByteBuffer getByteBuffer() {
517 return bb;
518 }
519 }
520
521
522
523
524 public final class BufferedStream {
525 private byte[] buffer;
526 private int count;
527 private int positionByteBuffer;
528 private boolean isNewLine;
529
530 BufferedStream(int capacity) {
531 this.buffer = new byte[capacity];
532 }
533
534 public int getPositionByteBuffer() {
535 return positionByteBuffer;
536 }
537
538 public void setPositionByteBuffer(int positionByteBuffer) {
539 this.positionByteBuffer = positionByteBuffer;
540 }
541
542 public void write(ByteBuffer bb, int position, int length) {
543 ensureCapacity(length);
544 byte[] array = bb.array();
545 int pos = bb.arrayOffset() + position;
546 while (length-- > 0) {
547 positionByteBuffer++;
548 byte b = array[pos++];
549 if (b == '\r' || b == '\n') {
550 if (!isNewLine) {
551 printExistingLine();
552 count = 0;
553 }
554 isNewLine = true;
555 } else {
556 buffer[count++] = b;
557 isNewLine = false;
558 }
559 }
560 }
561
562 public void clear() {
563 count = 0;
564 }
565
566 @Override
567 public String toString() {
568 return new String(buffer, 0, count, DEFAULT_STREAM_ENCODING);
569 }
570
571 private boolean isEmpty() {
572 return count == 0;
573 }
574
575 private void ensureCapacity(int addCapacity) {
576 int oldCapacity = buffer.length;
577 int exactCapacity = count + addCapacity;
578 if (exactCapacity < 0) {
579 throw new OutOfMemoryError();
580 }
581
582 if (oldCapacity < exactCapacity) {
583 int newCapacity = oldCapacity << 1;
584 buffer = copyOf(buffer, max(newCapacity, exactCapacity));
585 }
586 }
587
588 void printExistingLine() {
589 if (isEmpty()) {
590 return;
591 }
592
593 String s = toString();
594 if (isBlank(s)) {
595 return;
596 }
597
598 if (s.contains(PRINTABLE_JVM_NATIVE_STREAM)) {
599 if (logger.isDebugEnabled()) {
600 logger.debug(s);
601 } else if (logger.isInfoEnabled()) {
602 logger.info(s);
603 } else {
604
605 System.out.println(s);
606 }
607 } else {
608 if (isJvmError(s)) {
609 logger.error(s);
610 } else if (logger.isDebugEnabled()) {
611 logger.debug(s);
612 }
613
614 String msg = "Corrupted channel by directly writing to native stream in forked JVM "
615 + arguments.getForkChannelId() + ".";
616 File dumpFile = arguments.dumpStreamText(msg + " Stream '" + s + "'.");
617 String dumpPath = dumpFile.getAbsolutePath();
618 arguments.logWarningAtEnd(msg + " See FAQ web page and the dump file " + dumpPath);
619 }
620 }
621
622 private boolean isJvmError(String line) {
623 String lineLower = line.toLowerCase();
624 for (String errorPattern : JVM_ERROR_PATTERNS) {
625 if (lineLower.contains(errorPattern)) {
626 return true;
627 }
628 }
629 return false;
630 }
631 }
632
633 public static final class MalformedFrameException extends Exception {
634 private final int readFrom;
635 private final int readTo;
636
637 public MalformedFrameException(int readFrom, int readTo) {
638 this.readFrom = readFrom;
639 this.readTo = readTo;
640 }
641
642 public int readFrom() {
643 return readFrom;
644 }
645
646 public int readTo() {
647 return readTo;
648 }
649
650 public boolean hasValidPositions() {
651 return readFrom != NO_POSITION && readTo != NO_POSITION && readTo - readFrom > 0;
652 }
653 }
654
655
656
657
658
659
660
661
662 public enum StreamReadStatus {
663 UNDERFLOW,
664 OVERFLOW,
665 EOF
666 }
667 }