1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.surefire.api.stream;
20
21 import javax.annotation.Nonnegative;
22 import javax.annotation.Nonnull;
23
24 import java.io.EOFException;
25 import java.io.File;
26 import java.io.IOException;
27 import java.nio.Buffer;
28 import java.nio.ByteBuffer;
29 import java.nio.CharBuffer;
30 import java.nio.channels.ReadableByteChannel;
31 import java.nio.charset.Charset;
32 import java.nio.charset.CharsetDecoder;
33 import java.nio.charset.CoderResult;
34 import java.util.ArrayList;
35 import java.util.List;
36 import java.util.Map;
37
38 import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
39 import org.apache.maven.surefire.api.fork.ForkNodeArguments;
40
41 import static java.lang.Math.max;
42 import static java.lang.Math.min;
43 import static java.nio.charset.CodingErrorAction.REPLACE;
44 import static java.nio.charset.StandardCharsets.US_ASCII;
45 import static java.util.Arrays.copyOf;
46 import static org.apache.maven.surefire.api.booter.Constants.DEFAULT_STREAM_ENCODING;
47 import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.OVERFLOW;
48 import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.UNDERFLOW;
49 import static org.apache.maven.surefire.shared.lang3.StringUtils.isBlank;
50
51
52
53
54
55
56 public abstract class AbstractStreamDecoder<M, MT extends Enum<MT>, ST extends Enum<ST>> implements AutoCloseable {
57 public static final int BUFFER_SIZE = 1024;
58
59 private static final String PRINTABLE_JVM_NATIVE_STREAM = "Listening for transport dt_socket at address:";
60
61 private static final String[] JVM_ERROR_PATTERNS = {
62 "could not create the java virtual machine",
63 "error occurred during initialization",
64 "error:",
65 "could not reserve enough space",
66 "could not allocate",
67 "unable to allocate",
68 "java.lang.module.findexception"
69 };
70
71 private static final byte[] DEFAULT_STREAM_ENCODING_BYTES =
72 DEFAULT_STREAM_ENCODING.name().getBytes(US_ASCII);
73
74 private static final int NO_POSITION = -1;
75 private static final int DELIMITER_LENGTH = 1;
76 private static final int BYTE_LENGTH = 1;
77 private static final int INT_LENGTH = 4;
78 private static final int LONG_LENGTH = 8;
79
80 private final ReadableByteChannel channel;
81 private final ForkNodeArguments arguments;
82 private final Map<Segment, MT> messageTypes;
83 private final ConsoleLogger logger;
84
85 protected AbstractStreamDecoder(
86 @Nonnull ReadableByteChannel channel,
87 @Nonnull ForkNodeArguments arguments,
88 @Nonnull Map<Segment, MT> messageTypes) {
89 this.channel = channel;
90 this.arguments = arguments;
91 this.messageTypes = messageTypes;
92 logger = arguments.getConsoleLogger();
93 }
94
95 public abstract M decode(@Nonnull Memento memento) throws MalformedChannelException, IOException;
96
97 @Nonnull
98 protected abstract byte[] getEncodedMagicNumber();
99
100 @Nonnull
101 protected abstract ST[] nextSegmentType(@Nonnull MT messageType);
102
103 @Nonnull
104 protected abstract M toMessage(@Nonnull MT messageType, @Nonnull Memento memento) throws MalformedFrameException;
105
106 @Nonnull
107 protected final ForkNodeArguments getArguments() {
108 return arguments;
109 }
110
111 protected void debugStream(byte[] array, int position, int remaining) {}
112
113 protected MT readMessageType(@Nonnull Memento memento) throws IOException, MalformedFrameException {
114 byte[] header = getEncodedMagicNumber();
115 int readCount = DELIMITER_LENGTH + header.length + DELIMITER_LENGTH + BYTE_LENGTH + DELIMITER_LENGTH;
116 read(memento, readCount);
117 checkHeader(memento);
118 return messageTypes.get(readSegment(memento));
119 }
120
121 @Nonnull
122 @SuppressWarnings("checkstyle:magicnumber")
123 protected Segment readSegment(@Nonnull Memento memento) throws IOException, MalformedFrameException {
124 int readCount = readByte(memento) & 0xff;
125 read(memento, readCount + DELIMITER_LENGTH);
126 ByteBuffer bb = memento.getByteBuffer();
127 Segment segment = new Segment(bb.array(), bb.arrayOffset() + ((Buffer) bb).position(), readCount);
128 ((Buffer) bb).position(((Buffer) bb).position() + readCount);
129 checkDelimiter(memento);
130 return segment;
131 }
132
133 @Nonnull
134 @SuppressWarnings("checkstyle:magicnumber")
135 protected Charset readCharset(@Nonnull Memento memento) throws IOException, MalformedFrameException {
136 int length = readByte(memento) & 0xff;
137 read(memento, length + DELIMITER_LENGTH);
138 ByteBuffer bb = memento.getByteBuffer();
139 byte[] array = bb.array();
140 int offset = bb.arrayOffset() + ((Buffer) bb).position();
141 ((Buffer) bb).position(((Buffer) bb).position() + length);
142 boolean isDefaultEncoding = false;
143 if (length == DEFAULT_STREAM_ENCODING_BYTES.length) {
144 isDefaultEncoding = true;
145 for (int i = 0; i < length; i++) {
146 isDefaultEncoding &= DEFAULT_STREAM_ENCODING_BYTES[i] == array[offset + i];
147 }
148 }
149
150 try {
151 Charset charset = isDefaultEncoding
152 ? DEFAULT_STREAM_ENCODING
153 : Charset.forName(new String(array, offset, length, US_ASCII));
154
155 checkDelimiter(memento);
156 return charset;
157 } catch (IllegalArgumentException e) {
158 throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), ((Buffer) bb).position());
159 }
160 }
161
162 protected String readString(@Nonnull Memento memento) throws IOException, MalformedFrameException {
163 ((Buffer) memento.getCharBuffer()).clear();
164 int readCount = readInt(memento);
165 if (readCount < 0) {
166 throw new MalformedFrameException(
167 memento.getLine().getPositionByteBuffer(), ((Buffer) memento.getByteBuffer()).position());
168 }
169 read(memento, readCount + DELIMITER_LENGTH);
170
171 final String string;
172 if (readCount == 0) {
173 string = "";
174 } else if (readCount == 1) {
175 read(memento, 1);
176 byte oneChar = memento.getByteBuffer().get();
177 string = oneChar == 0 ? null : String.valueOf((char) oneChar);
178 } else {
179 string = readString(memento, readCount);
180 }
181 read(memento, 1);
182 checkDelimiter(memento);
183 return string;
184 }
185
186 protected Integer readInteger(@Nonnull Memento memento) throws IOException, MalformedFrameException {
187 read(memento, BYTE_LENGTH);
188 boolean isNullObject = memento.getByteBuffer().get() == 0;
189 if (isNullObject) {
190 read(memento, DELIMITER_LENGTH);
191 checkDelimiter(memento);
192 return null;
193 }
194 return readInt(memento);
195 }
196
197 protected byte readByte(@Nonnull Memento memento) throws IOException, MalformedFrameException {
198 read(memento, BYTE_LENGTH + DELIMITER_LENGTH);
199 byte b = memento.getByteBuffer().get();
200 checkDelimiter(memento);
201 return b;
202 }
203
204 protected int readInt(@Nonnull Memento memento) throws IOException, MalformedFrameException {
205 read(memento, INT_LENGTH + DELIMITER_LENGTH);
206 int i = memento.getByteBuffer().getInt();
207 checkDelimiter(memento);
208 return i;
209 }
210
211 protected Long readLong(@Nonnull Memento memento) throws IOException, MalformedFrameException {
212 read(memento, BYTE_LENGTH);
213 boolean isNullObject = memento.getByteBuffer().get() == 0;
214 if (isNullObject) {
215 read(memento, DELIMITER_LENGTH);
216 checkDelimiter(memento);
217 return null;
218 }
219 return readLongPrivate(memento);
220 }
221
222 protected long readLongPrivate(@Nonnull Memento memento) throws IOException, MalformedFrameException {
223 read(memento, LONG_LENGTH + DELIMITER_LENGTH);
224 long num = memento.getByteBuffer().getLong();
225 checkDelimiter(memento);
226 return num;
227 }
228
229 @SuppressWarnings("checkstyle:magicnumber")
230 protected final void checkDelimiter(Memento memento) throws MalformedFrameException {
231 ByteBuffer bb = memento.bb;
232 if ((0xff & bb.get()) != ':') {
233 throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), ((Buffer) bb).position());
234 }
235 }
236
237 protected final void checkHeader(Memento memento) throws MalformedFrameException {
238 ByteBuffer bb = memento.bb;
239
240 checkDelimiter(memento);
241
242 int shift = 0;
243 try {
244 byte[] header = getEncodedMagicNumber();
245 byte[] bbArray = bb.array();
246 for (int start = bb.arrayOffset() + ((Buffer) bb).position(), length = header.length;
247 shift < length;
248 shift++) {
249 if (bbArray[shift + start] != header[shift]) {
250 throw new MalformedFrameException(
251 memento.getLine().getPositionByteBuffer(), ((Buffer) bb).position() + shift);
252 }
253 }
254 } finally {
255 ((Buffer) bb).position(((Buffer) bb).position() + shift);
256 }
257
258 checkDelimiter(memento);
259 }
260
261 protected void checkArguments(Memento memento, int expectedDataElements) throws MalformedFrameException {
262 if (memento.getData().size() != expectedDataElements) {
263 throw new MalformedFrameException(
264 memento.getLine().getPositionByteBuffer(), ((Buffer) memento.getByteBuffer()).position());
265 }
266 }
267
268 private String readString(@Nonnull final Memento memento, @Nonnegative final int totalBytes)
269 throws IOException, MalformedFrameException {
270 memento.getDecoder().reset();
271 final CharBuffer output = memento.getCharBuffer();
272 ((Buffer) output).clear();
273 final ByteBuffer input = memento.getByteBuffer();
274 final List<String> strings = new ArrayList<>();
275 int countDecodedBytes = 0;
276 for (boolean endOfInput = false; !endOfInput; ) {
277 final int bytesToRead = totalBytes - countDecodedBytes;
278 read(memento, bytesToRead);
279 int bytesToDecode = min(input.remaining(), bytesToRead);
280 final boolean isLastChunk = bytesToDecode == bytesToRead;
281 endOfInput = countDecodedBytes + bytesToDecode >= totalBytes;
282 do {
283 boolean endOfChunk = output.remaining() >= bytesToRead;
284 boolean endOfOutput = isLastChunk && endOfChunk;
285 int readInputBytes = decodeString(
286 memento.getDecoder(),
287 input,
288 output,
289 bytesToDecode,
290 endOfOutput,
291 memento.getLine().getPositionByteBuffer());
292 bytesToDecode -= readInputBytes;
293 countDecodedBytes += readInputBytes;
294 } while (isLastChunk && bytesToDecode > 0 && output.hasRemaining());
295
296 strings.add(((Buffer) output).flip().toString());
297 ((Buffer) output).clear();
298 }
299
300 memento.getDecoder().reset();
301 ((Buffer) output).clear();
302
303 return toString(strings);
304 }
305
306 private static int decodeString(
307 @Nonnull CharsetDecoder decoder,
308 @Nonnull ByteBuffer input,
309 @Nonnull CharBuffer output,
310 @Nonnegative int bytesToDecode,
311 boolean endOfInput,
312 @Nonnegative int errorStreamFrom)
313 throws MalformedFrameException {
314 int limit = ((Buffer) input).limit();
315 ((Buffer) input).limit(((Buffer) input).position() + bytesToDecode);
316
317 CoderResult result = decoder.decode(input, output, endOfInput);
318 if (result.isError() || result.isMalformed()) {
319 throw new MalformedFrameException(errorStreamFrom, ((Buffer) input).position());
320 }
321
322 int decodedBytes = bytesToDecode - input.remaining();
323 ((Buffer) input).limit(limit);
324 return decodedBytes;
325 }
326
327 private static String toString(List<String> strings) {
328 if (strings.size() == 1) {
329 return strings.get(0);
330 }
331 StringBuilder concatenated = new StringBuilder(strings.size() * BUFFER_SIZE);
332 for (String s : strings) {
333 concatenated.append(s);
334 }
335 return concatenated.toString();
336 }
337
338 private void printCorruptedStream(Memento memento) {
339 ByteBuffer bb = memento.getByteBuffer();
340 if (bb.hasRemaining()) {
341 int bytesToWrite = bb.remaining();
342 memento.getLine().write(bb, ((Buffer) bb).position(), bytesToWrite);
343 ((Buffer) bb).position(((Buffer) bb).position() + bytesToWrite);
344 }
345 }
346
347
348
349
350
351
352 protected final void printRemainingStream(Memento memento) {
353 printCorruptedStream(memento);
354 memento.getLine().printExistingLine();
355 memento.getLine().clear();
356 }
357
358
359
360
361 public static final class Segment {
362 private final byte[] array;
363 private final int fromIndex;
364 private final int length;
365 private final int hashCode;
366
367 public Segment(byte[] array, int fromIndex, int length) {
368 this.array = array;
369 this.fromIndex = fromIndex;
370 this.length = length;
371
372 int hashCode = 0;
373 int i = fromIndex;
374 for (int loops = length >> 1; loops-- != 0; ) {
375 hashCode = 31 * hashCode + array[i++];
376 hashCode = 31 * hashCode + array[i++];
377 }
378 this.hashCode = i == fromIndex + length ? hashCode : 31 * hashCode + array[i];
379 }
380
381 @Override
382 public int hashCode() {
383 return hashCode;
384 }
385
386 @Override
387 public boolean equals(Object obj) {
388 if (!(obj instanceof Segment)) {
389 return false;
390 }
391
392 Segment that = (Segment) obj;
393 if (that.length != length) {
394 return false;
395 }
396
397 for (int i = 0; i < length; i++) {
398 if (that.array[that.fromIndex + i] != array[fromIndex + i]) {
399 return false;
400 }
401 }
402 return true;
403 }
404 }
405
406 protected @Nonnull StreamReadStatus read(@Nonnull Memento memento, int recommendedCount) throws IOException {
407 ByteBuffer buffer = memento.getByteBuffer();
408 if (buffer.remaining() >= recommendedCount && ((Buffer) buffer).limit() != 0) {
409 return OVERFLOW;
410 } else {
411 if (((Buffer) buffer).position() != 0
412 && recommendedCount > buffer.capacity() - ((Buffer) buffer).position()) {
413 ((Buffer) buffer.compact()).flip();
414 memento.getLine().setPositionByteBuffer(0);
415 }
416 int mark = ((Buffer) buffer).position();
417 ((Buffer) buffer).position(((Buffer) buffer).limit());
418 ((Buffer) buffer).limit(min(((Buffer) buffer).position() + recommendedCount, buffer.capacity()));
419 return read(buffer, mark, recommendedCount);
420 }
421 }
422
423 private StreamReadStatus read(ByteBuffer buffer, int oldPosition, int recommendedCount) throws IOException {
424 StreamReadStatus readStatus = null;
425 boolean isEnd = false;
426 try {
427 while (!isEnd
428 && ((Buffer) buffer).position() - oldPosition < recommendedCount
429 && ((Buffer) buffer).position() < ((Buffer) buffer).limit()) {
430 isEnd = channel.read(buffer) == -1;
431 }
432 } finally {
433 ((Buffer) buffer).limit(((Buffer) buffer).position());
434 ((Buffer) buffer).position(oldPosition);
435 int readBytes = buffer.remaining();
436 boolean readComplete = readBytes >= recommendedCount;
437 if (!isEnd || readComplete) {
438 debugStream(buffer.array(), buffer.arrayOffset() + ((Buffer) buffer).position(), buffer.remaining());
439 readStatus = readComplete ? OVERFLOW : UNDERFLOW;
440 }
441 }
442
443 if (readStatus == null) {
444 throw new EOFException();
445 } else {
446 return readStatus;
447 }
448 }
449
450
451
452
453 public final class Memento {
454 private CharsetDecoder currentDecoder;
455 private final CharsetDecoder defaultDecoder;
456 private final BufferedStream line = new BufferedStream(32);
457 private final List<Object> data = new ArrayList<>();
458 private final CharBuffer cb = CharBuffer.allocate(BUFFER_SIZE);
459 private final ByteBuffer bb = ByteBuffer.allocate(BUFFER_SIZE);
460
461 public Memento() {
462 defaultDecoder = DEFAULT_STREAM_ENCODING
463 .newDecoder()
464 .onMalformedInput(REPLACE)
465 .onUnmappableCharacter(REPLACE);
466 ((Buffer) bb).limit(0);
467 }
468
469 public void reset() {
470 currentDecoder = null;
471 data.clear();
472 }
473
474 public CharsetDecoder getDecoder() {
475 return currentDecoder == null ? defaultDecoder : currentDecoder;
476 }
477
478 public void setCharset(Charset charset) {
479 if (charset.name().equals(defaultDecoder.charset().name())) {
480 currentDecoder = defaultDecoder;
481 } else {
482 currentDecoder = charset.newDecoder().onMalformedInput(REPLACE).onUnmappableCharacter(REPLACE);
483 }
484 }
485
486 public BufferedStream getLine() {
487 return line;
488 }
489
490 public List<Object> getData() {
491 return data;
492 }
493
494 public <T> T ofDataAt(int indexOfData) {
495
496 return (T) data.get(indexOfData);
497 }
498
499 public CharBuffer getCharBuffer() {
500 return cb;
501 }
502
503 public ByteBuffer getByteBuffer() {
504 return bb;
505 }
506 }
507
508
509
510
511 public final class BufferedStream {
512 private byte[] buffer;
513 private int count;
514 private int positionByteBuffer;
515 private boolean isNewLine;
516
517 BufferedStream(int capacity) {
518 this.buffer = new byte[capacity];
519 }
520
521 public int getPositionByteBuffer() {
522 return positionByteBuffer;
523 }
524
525 public void setPositionByteBuffer(int positionByteBuffer) {
526 this.positionByteBuffer = positionByteBuffer;
527 }
528
529 public void write(ByteBuffer bb, int position, int length) {
530 ensureCapacity(length);
531 byte[] array = bb.array();
532 int pos = bb.arrayOffset() + position;
533 while (length-- > 0) {
534 positionByteBuffer++;
535 byte b = array[pos++];
536 if (b == '\r' || b == '\n') {
537 if (!isNewLine) {
538 printExistingLine();
539 count = 0;
540 }
541 isNewLine = true;
542 } else {
543 buffer[count++] = b;
544 isNewLine = false;
545 }
546 }
547 }
548
549 public void clear() {
550 count = 0;
551 }
552
553 @Override
554 public String toString() {
555 return new String(buffer, 0, count, DEFAULT_STREAM_ENCODING);
556 }
557
558 private boolean isEmpty() {
559 return count == 0;
560 }
561
562 private void ensureCapacity(int addCapacity) {
563 int oldCapacity = buffer.length;
564 int exactCapacity = count + addCapacity;
565 if (exactCapacity < 0) {
566 throw new OutOfMemoryError();
567 }
568
569 if (oldCapacity < exactCapacity) {
570 int newCapacity = oldCapacity << 1;
571 buffer = copyOf(buffer, max(newCapacity, exactCapacity));
572 }
573 }
574
575 void printExistingLine() {
576 if (isEmpty()) {
577 return;
578 }
579
580 String s = toString();
581 if (isBlank(s)) {
582 return;
583 }
584
585 if (s.contains(PRINTABLE_JVM_NATIVE_STREAM)) {
586 if (logger.isDebugEnabled()) {
587 logger.debug(s);
588 } else if (logger.isInfoEnabled()) {
589 logger.info(s);
590 } else {
591
592 System.out.println(s);
593 }
594 } else {
595 if (isJvmError(s)) {
596 logger.error(s);
597 } else if (logger.isDebugEnabled()) {
598 logger.debug(s);
599 }
600
601 String msg = "Corrupted channel by directly writing to native stream in forked JVM "
602 + arguments.getForkChannelId() + ".";
603 File dumpFile = arguments.dumpStreamText(msg + " Stream '" + s + "'.");
604 String dumpPath = dumpFile.getAbsolutePath();
605 arguments.logWarningAtEnd(msg + " See FAQ web page and the dump file " + dumpPath);
606 }
607 }
608
609 private boolean isJvmError(String line) {
610 String lineLower = line.toLowerCase();
611 for (String errorPattern : JVM_ERROR_PATTERNS) {
612 if (lineLower.contains(errorPattern)) {
613 return true;
614 }
615 }
616 return false;
617 }
618 }
619
620
621
622
623 public static final class MalformedFrameException extends Exception {
624 private final int readFrom;
625 private final int readTo;
626
627 public MalformedFrameException(int readFrom, int readTo) {
628 this.readFrom = readFrom;
629 this.readTo = readTo;
630 }
631
632 public int readFrom() {
633 return readFrom;
634 }
635
636 public int readTo() {
637 return readTo;
638 }
639
640 public boolean hasValidPositions() {
641 return readFrom != NO_POSITION && readTo != NO_POSITION && readTo - readFrom > 0;
642 }
643 }
644
645
646
647
648
649
650
651
652 public enum StreamReadStatus {
653 UNDERFLOW,
654 OVERFLOW,
655 EOF
656 }
657 }