1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.surefire.api.stream;
20
21 import javax.annotation.Nonnegative;
22 import javax.annotation.Nonnull;
23
24 import java.io.EOFException;
25 import java.io.File;
26 import java.io.IOException;
27 import java.nio.ByteBuffer;
28 import java.nio.CharBuffer;
29 import java.nio.channels.ReadableByteChannel;
30 import java.nio.charset.Charset;
31 import java.nio.charset.CharsetDecoder;
32 import java.nio.charset.CoderResult;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.Map;
36
37 import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
38 import org.apache.maven.surefire.api.fork.ForkNodeArguments;
39
40 import static java.lang.Math.max;
41 import static java.lang.Math.min;
42 import static java.nio.charset.CodingErrorAction.REPLACE;
43 import static java.nio.charset.StandardCharsets.US_ASCII;
44 import static java.util.Arrays.copyOf;
45 import static org.apache.maven.surefire.api.booter.Constants.DEFAULT_STREAM_ENCODING;
46 import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.OVERFLOW;
47 import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.UNDERFLOW;
48 import static org.apache.maven.surefire.shared.lang3.StringUtils.isBlank;
49
50
51
52
53
54
55 public abstract class AbstractStreamDecoder<M, MT extends Enum<MT>, ST extends Enum<ST>> implements AutoCloseable {
56 public static final int BUFFER_SIZE = 1024;
57
58 private static final String PRINTABLE_JVM_NATIVE_STREAM = "Listening for transport dt_socket at address:";
59
60 private static final String[] JVM_ERROR_PATTERNS = {
61 "could not create the java virtual machine",
62 "error occurred during initialization",
63 "error:",
64 "could not reserve enough space",
65 "could not allocate",
66 "unable to allocate",
67 "java.lang.module.findexception"
68 };
69
70 private static final byte[] DEFAULT_STREAM_ENCODING_BYTES =
71 DEFAULT_STREAM_ENCODING.name().getBytes(US_ASCII);
72
73 private static final int NO_POSITION = -1;
74 private static final int DELIMITER_LENGTH = 1;
75 private static final int BYTE_LENGTH = 1;
76 private static final int INT_LENGTH = 4;
77 private static final int LONG_LENGTH = 8;
78
79 private final ReadableByteChannel channel;
80 private final ForkNodeArguments arguments;
81 private final Map<Segment, MT> messageTypes;
82 private final ConsoleLogger logger;
83
84 protected AbstractStreamDecoder(
85 @Nonnull ReadableByteChannel channel,
86 @Nonnull ForkNodeArguments arguments,
87 @Nonnull Map<Segment, MT> messageTypes) {
88 this.channel = channel;
89 this.arguments = arguments;
90 this.messageTypes = messageTypes;
91 logger = arguments.getConsoleLogger();
92 }
93
94 public abstract M decode(@Nonnull Memento memento) throws MalformedChannelException, IOException;
95
96 @Nonnull
97 protected abstract byte[] getEncodedMagicNumber();
98
99 @Nonnull
100 protected abstract ST[] nextSegmentType(@Nonnull MT messageType);
101
102 @Nonnull
103 protected abstract M toMessage(@Nonnull MT messageType, @Nonnull Memento memento) throws MalformedFrameException;
104
105 @Nonnull
106 protected final ForkNodeArguments getArguments() {
107 return arguments;
108 }
109
110 protected void debugStream(byte[] array, int position, int remaining) {}
111
112 protected MT readMessageType(@Nonnull Memento memento) throws IOException, MalformedFrameException {
113 byte[] header = getEncodedMagicNumber();
114
115
116
117
118
119 read(memento, DELIMITER_LENGTH);
120 ByteBuffer bb = memento.getByteBuffer();
121 if ((bb.array()[bb.arrayOffset() + bb.position()] & 0xff) != ':') {
122 checkHeader(memento);
123 }
124 checkDelimiter(memento);
125 for (int i = 0; i < header.length; i++) {
126 read(memento, DELIMITER_LENGTH);
127 bb = memento.getByteBuffer();
128 if ((bb.array()[bb.arrayOffset() + bb.position()] & 0xff) != (header[i] & 0xff)) {
129
130 throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), bb.position() + 1);
131 }
132 bb.position(bb.position() + 1);
133 }
134 read(memento, DELIMITER_LENGTH);
135 checkDelimiter(memento);
136 return messageTypes.get(readSegment(memento));
137 }
138
139 @Nonnull
140 @SuppressWarnings("checkstyle:magicnumber")
141 protected Segment readSegment(@Nonnull Memento memento) throws IOException, MalformedFrameException {
142 int readCount = readByte(memento) & 0xff;
143 read(memento, readCount + DELIMITER_LENGTH);
144 ByteBuffer bb = memento.getByteBuffer();
145 Segment segment = new Segment(bb.array(), bb.arrayOffset() + bb.position(), readCount);
146 bb.position(bb.position() + readCount);
147 checkDelimiter(memento);
148 return segment;
149 }
150
151 @Nonnull
152 @SuppressWarnings("checkstyle:magicnumber")
153 protected Charset readCharset(@Nonnull Memento memento) throws IOException, MalformedFrameException {
154 int length = readByte(memento) & 0xff;
155 read(memento, length + DELIMITER_LENGTH);
156 ByteBuffer bb = memento.getByteBuffer();
157 byte[] array = bb.array();
158 int offset = bb.arrayOffset() + bb.position();
159 bb.position(bb.position() + length);
160 boolean isDefaultEncoding = false;
161 if (length == DEFAULT_STREAM_ENCODING_BYTES.length) {
162 isDefaultEncoding = true;
163 for (int i = 0; i < length; i++) {
164 isDefaultEncoding &= DEFAULT_STREAM_ENCODING_BYTES[i] == array[offset + i];
165 }
166 }
167
168 try {
169 Charset charset = isDefaultEncoding
170 ? DEFAULT_STREAM_ENCODING
171 : Charset.forName(new String(array, offset, length, US_ASCII));
172
173 checkDelimiter(memento);
174 return charset;
175 } catch (IllegalArgumentException e) {
176 throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), bb.position());
177 }
178 }
179
180 protected String readString(@Nonnull Memento memento) throws IOException, MalformedFrameException {
181 (memento.getCharBuffer()).clear();
182 int readCount = readInt(memento);
183 if (readCount < 0) {
184 throw new MalformedFrameException(
185 memento.getLine().getPositionByteBuffer(), (memento.getByteBuffer()).position());
186 }
187 read(memento, readCount + DELIMITER_LENGTH);
188
189 final String string;
190 if (readCount == 0) {
191 string = "";
192 } else if (readCount == 1) {
193 read(memento, 1);
194 byte oneChar = memento.getByteBuffer().get();
195 string = oneChar == 0 ? null : String.valueOf((char) oneChar);
196 } else {
197 string = readString(memento, readCount);
198 }
199 read(memento, 1);
200 checkDelimiter(memento);
201 return string;
202 }
203
204 protected Integer readInteger(@Nonnull Memento memento) throws IOException, MalformedFrameException {
205 read(memento, BYTE_LENGTH);
206 boolean isNullObject = memento.getByteBuffer().get() == 0;
207 if (isNullObject) {
208 read(memento, DELIMITER_LENGTH);
209 checkDelimiter(memento);
210 return null;
211 }
212 return readInt(memento);
213 }
214
215 protected byte readByte(@Nonnull Memento memento) throws IOException, MalformedFrameException {
216 read(memento, BYTE_LENGTH + DELIMITER_LENGTH);
217 byte b = memento.getByteBuffer().get();
218 checkDelimiter(memento);
219 return b;
220 }
221
222 protected int readInt(@Nonnull Memento memento) throws IOException, MalformedFrameException {
223 read(memento, INT_LENGTH + DELIMITER_LENGTH);
224 int i = memento.getByteBuffer().getInt();
225 checkDelimiter(memento);
226 return i;
227 }
228
229 protected Long readLong(@Nonnull Memento memento) throws IOException, MalformedFrameException {
230 read(memento, BYTE_LENGTH);
231 boolean isNullObject = memento.getByteBuffer().get() == 0;
232 if (isNullObject) {
233 read(memento, DELIMITER_LENGTH);
234 checkDelimiter(memento);
235 return null;
236 }
237 return readLongPrivate(memento);
238 }
239
240 protected long readLongPrivate(@Nonnull Memento memento) throws IOException, MalformedFrameException {
241 read(memento, LONG_LENGTH + DELIMITER_LENGTH);
242 long num = memento.getByteBuffer().getLong();
243 checkDelimiter(memento);
244 return num;
245 }
246
247 @SuppressWarnings("checkstyle:magicnumber")
248 protected final void checkDelimiter(Memento memento) throws MalformedFrameException {
249 ByteBuffer bb = memento.bb;
250 if ((0xff & bb.get()) != ':') {
251 throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), bb.position());
252 }
253 }
254
255 protected final void checkHeader(Memento memento) throws MalformedFrameException {
256 ByteBuffer bb = memento.bb;
257
258 checkDelimiter(memento);
259
260 int shift = 0;
261 try {
262 byte[] header = getEncodedMagicNumber();
263 byte[] bbArray = bb.array();
264 for (int start = bb.arrayOffset() + bb.position(), length = header.length; shift < length; shift++) {
265 if (bbArray[shift + start] != header[shift]) {
266 throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), bb.position() + shift);
267 }
268 }
269 } finally {
270 bb.position(bb.position() + shift);
271 }
272
273 checkDelimiter(memento);
274 }
275
276 protected void checkArguments(Memento memento, int expectedDataElements) throws MalformedFrameException {
277 if (memento.getData().size() != expectedDataElements) {
278 throw new MalformedFrameException(
279 memento.getLine().getPositionByteBuffer(), (memento.getByteBuffer()).position());
280 }
281 }
282
283 private String readString(@Nonnull final Memento memento, @Nonnegative final int totalBytes)
284 throws IOException, MalformedFrameException {
285 memento.getDecoder().reset();
286 final CharBuffer output = memento.getCharBuffer();
287 (output).clear();
288 final ByteBuffer input = memento.getByteBuffer();
289 final List<String> strings = new ArrayList<>();
290 int countDecodedBytes = 0;
291 for (boolean endOfInput = false; !endOfInput; ) {
292 final int bytesToRead = totalBytes - countDecodedBytes;
293 read(memento, bytesToRead);
294 int bytesToDecode = min(input.remaining(), bytesToRead);
295 final boolean isLastChunk = bytesToDecode == bytesToRead;
296 endOfInput = countDecodedBytes + bytesToDecode >= totalBytes;
297 do {
298 boolean endOfChunk = output.remaining() >= bytesToRead;
299 boolean endOfOutput = isLastChunk && endOfChunk;
300 int readInputBytes = decodeString(
301 memento.getDecoder(),
302 input,
303 output,
304 bytesToDecode,
305 endOfOutput,
306 memento.getLine().getPositionByteBuffer());
307 bytesToDecode -= readInputBytes;
308 countDecodedBytes += readInputBytes;
309 } while (isLastChunk && bytesToDecode > 0 && output.hasRemaining());
310
311 strings.add((output).flip().toString());
312 (output).clear();
313 }
314
315 memento.getDecoder().reset();
316 (output).clear();
317
318 return toString(strings);
319 }
320
321 private static int decodeString(
322 @Nonnull CharsetDecoder decoder,
323 @Nonnull ByteBuffer input,
324 @Nonnull CharBuffer output,
325 @Nonnegative int bytesToDecode,
326 boolean endOfInput,
327 @Nonnegative int errorStreamFrom)
328 throws MalformedFrameException {
329 int limit = (input).limit();
330 (input).limit((input).position() + bytesToDecode);
331
332 CoderResult result = decoder.decode(input, output, endOfInput);
333 if (result.isError() || result.isMalformed()) {
334 throw new MalformedFrameException(errorStreamFrom, (input).position());
335 }
336
337 int decodedBytes = bytesToDecode - input.remaining();
338 (input).limit(limit);
339 return decodedBytes;
340 }
341
342 private static String toString(List<String> strings) {
343 if (strings.size() == 1) {
344 return strings.get(0);
345 }
346 StringBuilder concatenated = new StringBuilder(strings.size() * BUFFER_SIZE);
347 for (String s : strings) {
348 concatenated.append(s);
349 }
350 return concatenated.toString();
351 }
352
353 private void printCorruptedStream(Memento memento) {
354 ByteBuffer bb = memento.getByteBuffer();
355 if (bb.hasRemaining()) {
356 int bytesToWrite = bb.remaining();
357 memento.getLine().write(bb, bb.position(), bytesToWrite);
358 bb.position(bb.position() + bytesToWrite);
359 }
360 }
361
362
363
364
365
366
367 protected final void printRemainingStream(Memento memento) {
368 printCorruptedStream(memento);
369 memento.getLine().printExistingLine();
370 memento.getLine().clear();
371 }
372
373 public static final class Segment {
374 private final byte[] array;
375 private final int fromIndex;
376 private final int length;
377 private final int hashCode;
378
379 public Segment(byte[] array, int fromIndex, int length) {
380 this.array = array;
381 this.fromIndex = fromIndex;
382 this.length = length;
383
384 int hashCode = 0;
385 int i = fromIndex;
386 for (int loops = length >> 1; loops-- != 0; ) {
387 hashCode = 31 * hashCode + array[i++];
388 hashCode = 31 * hashCode + array[i++];
389 }
390 this.hashCode = i == fromIndex + length ? hashCode : 31 * hashCode + array[i];
391 }
392
393 @Override
394 public int hashCode() {
395 return hashCode;
396 }
397
398 @Override
399 public boolean equals(Object obj) {
400 if (!(obj instanceof Segment)) {
401 return false;
402 }
403
404 Segment that = (Segment) obj;
405 if (that.length != length) {
406 return false;
407 }
408
409 for (int i = 0; i < length; i++) {
410 if (that.array[that.fromIndex + i] != array[fromIndex + i]) {
411 return false;
412 }
413 }
414 return true;
415 }
416 }
417
418 protected @Nonnull StreamReadStatus read(@Nonnull Memento memento, int recommendedCount) throws IOException {
419 ByteBuffer buffer = memento.getByteBuffer();
420 if (buffer.remaining() >= recommendedCount && buffer.limit() != 0) {
421 return OVERFLOW;
422 } else {
423 if (buffer.position() != 0 && recommendedCount > buffer.capacity() - buffer.position()) {
424 (buffer.compact()).flip();
425 memento.getLine().setPositionByteBuffer(0);
426 }
427 int mark = buffer.position();
428 buffer.position(buffer.limit());
429 buffer.limit(min(buffer.position() + recommendedCount, buffer.capacity()));
430 return read(buffer, mark, recommendedCount);
431 }
432 }
433
434 private StreamReadStatus read(ByteBuffer buffer, int oldPosition, int recommendedCount) throws IOException {
435 StreamReadStatus readStatus = null;
436 boolean isEnd = false;
437 try {
438 while (!isEnd && buffer.position() - oldPosition < recommendedCount && buffer.position() < buffer.limit()) {
439 isEnd = channel.read(buffer) == -1;
440 }
441 } finally {
442 buffer.limit(buffer.position());
443 buffer.position(oldPosition);
444 int readBytes = buffer.remaining();
445 boolean readComplete = readBytes >= recommendedCount;
446 if (!isEnd || readComplete) {
447 debugStream(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
448 readStatus = readComplete ? OVERFLOW : UNDERFLOW;
449 }
450 }
451
452 if (readStatus == null) {
453 throw new EOFException();
454 } else {
455 return readStatus;
456 }
457 }
458
459 public final class Memento {
460 private CharsetDecoder currentDecoder;
461 private final CharsetDecoder defaultDecoder;
462 private final BufferedStream line = new BufferedStream(32);
463 private final List<Object> data = new ArrayList<>();
464 private final CharBuffer cb = CharBuffer.allocate(BUFFER_SIZE);
465 private final ByteBuffer bb = ByteBuffer.allocate(BUFFER_SIZE);
466
467 public Memento() {
468 defaultDecoder = DEFAULT_STREAM_ENCODING
469 .newDecoder()
470 .onMalformedInput(REPLACE)
471 .onUnmappableCharacter(REPLACE);
472 bb.limit(0);
473 }
474
475 public void reset() {
476 currentDecoder = null;
477 data.clear();
478 }
479
480 public CharsetDecoder getDecoder() {
481 return currentDecoder == null ? defaultDecoder : currentDecoder;
482 }
483
484 public void setCharset(Charset charset) {
485 if (charset.name().equals(defaultDecoder.charset().name())) {
486 currentDecoder = defaultDecoder;
487 } else {
488 currentDecoder = charset.newDecoder().onMalformedInput(REPLACE).onUnmappableCharacter(REPLACE);
489 }
490 }
491
492 public BufferedStream getLine() {
493 return line;
494 }
495
496 public List<Object> getData() {
497 return data;
498 }
499
500 public <T> T ofDataAt(int indexOfData) {
501
502 return (T) data.get(indexOfData);
503 }
504
505 public CharBuffer getCharBuffer() {
506 return cb;
507 }
508
509 public ByteBuffer getByteBuffer() {
510 return bb;
511 }
512 }
513
514
515
516
517 public final class BufferedStream {
518 private byte[] buffer;
519 private int count;
520 private int positionByteBuffer;
521 private boolean isNewLine;
522
523 BufferedStream(int capacity) {
524 this.buffer = new byte[capacity];
525 }
526
527 public int getPositionByteBuffer() {
528 return positionByteBuffer;
529 }
530
531 public void setPositionByteBuffer(int positionByteBuffer) {
532 this.positionByteBuffer = positionByteBuffer;
533 }
534
535 public void write(ByteBuffer bb, int position, int length) {
536 ensureCapacity(length);
537 byte[] array = bb.array();
538 int pos = bb.arrayOffset() + position;
539 while (length-- > 0) {
540 positionByteBuffer++;
541 byte b = array[pos++];
542 if (b == '\r' || b == '\n') {
543 if (!isNewLine) {
544 printExistingLine();
545 count = 0;
546 }
547 isNewLine = true;
548 } else {
549 buffer[count++] = b;
550 isNewLine = false;
551 }
552 }
553 }
554
555 public void clear() {
556 count = 0;
557 }
558
559 @Override
560 public String toString() {
561 return new String(buffer, 0, count, DEFAULT_STREAM_ENCODING);
562 }
563
564 private boolean isEmpty() {
565 return count == 0;
566 }
567
568 private void ensureCapacity(int addCapacity) {
569 int oldCapacity = buffer.length;
570 int exactCapacity = count + addCapacity;
571 if (exactCapacity < 0) {
572 throw new OutOfMemoryError();
573 }
574
575 if (oldCapacity < exactCapacity) {
576 int newCapacity = oldCapacity << 1;
577 buffer = copyOf(buffer, max(newCapacity, exactCapacity));
578 }
579 }
580
581 void printExistingLine() {
582 if (isEmpty()) {
583 return;
584 }
585
586 String s = toString();
587 if (isBlank(s)) {
588 return;
589 }
590
591 if (s.contains(PRINTABLE_JVM_NATIVE_STREAM)) {
592 if (logger.isDebugEnabled()) {
593 logger.debug(s);
594 } else if (logger.isInfoEnabled()) {
595 logger.info(s);
596 } else {
597
598 System.out.println(s);
599 }
600 } else {
601 if (isJvmError(s)) {
602 logger.error(s);
603 } else if (logger.isDebugEnabled()) {
604 logger.debug(s);
605 }
606
607 String msg = "Corrupted channel by directly writing to native stream in forked JVM "
608 + arguments.getForkChannelId() + ".";
609 File dumpFile = arguments.dumpStreamText(msg + " Stream '" + s + "'.");
610 String dumpPath = dumpFile.getAbsolutePath();
611 arguments.logWarningAtEnd(msg + " See FAQ web page and the dump file " + dumpPath);
612 }
613 }
614
615 private boolean isJvmError(String line) {
616 String lineLower = line.toLowerCase();
617 for (String errorPattern : JVM_ERROR_PATTERNS) {
618 if (lineLower.contains(errorPattern)) {
619 return true;
620 }
621 }
622 return false;
623 }
624 }
625
626 public static final class MalformedFrameException extends Exception {
627 private final int readFrom;
628 private final int readTo;
629
630 public MalformedFrameException(int readFrom, int readTo) {
631 this.readFrom = readFrom;
632 this.readTo = readTo;
633 }
634
635 public int readFrom() {
636 return readFrom;
637 }
638
639 public int readTo() {
640 return readTo;
641 }
642
643 public boolean hasValidPositions() {
644 return readFrom != NO_POSITION && readTo != NO_POSITION && readTo - readFrom > 0;
645 }
646 }
647
648
649
650
651
652
653
654
655 public enum StreamReadStatus {
656 UNDERFLOW,
657 OVERFLOW,
658 EOF
659 }
660 }