1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.surefire.api.stream;
20
21 import javax.annotation.Nonnegative;
22 import javax.annotation.Nonnull;
23
24 import java.io.EOFException;
25 import java.io.File;
26 import java.io.IOException;
27 import java.nio.ByteBuffer;
28 import java.nio.CharBuffer;
29 import java.nio.channels.ReadableByteChannel;
30 import java.nio.charset.Charset;
31 import java.nio.charset.CharsetDecoder;
32 import java.nio.charset.CoderResult;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.Map;
36
37 import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
38 import org.apache.maven.surefire.api.fork.ForkNodeArguments;
39
40 import static java.lang.Math.max;
41 import static java.lang.Math.min;
42 import static java.nio.charset.CodingErrorAction.REPLACE;
43 import static java.nio.charset.StandardCharsets.US_ASCII;
44 import static java.util.Arrays.copyOf;
45 import static org.apache.maven.surefire.api.booter.Constants.DEFAULT_STREAM_ENCODING;
46 import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.OVERFLOW;
47 import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.UNDERFLOW;
48 import static org.apache.maven.surefire.shared.lang3.StringUtils.isBlank;
49
50
51
52
53
54
55 public abstract class AbstractStreamDecoder<M, MT extends Enum<MT>, ST extends Enum<ST>> implements AutoCloseable {
56 public static final int BUFFER_SIZE = 1024;
57
58 private static final String PRINTABLE_JVM_NATIVE_STREAM = "Listening for transport dt_socket at address:";
59
60 private static final String[] JVM_ERROR_PATTERNS = {
61 "could not create the java virtual machine",
62 "error occurred during initialization",
63 "error:",
64 "could not reserve enough space",
65 "could not allocate",
66 "unable to allocate",
67 "java.lang.module.findexception"
68 };
69
70 private static final byte[] DEFAULT_STREAM_ENCODING_BYTES =
71 DEFAULT_STREAM_ENCODING.name().getBytes(US_ASCII);
72
73 private static final int NO_POSITION = -1;
74 private static final int DELIMITER_LENGTH = 1;
75 private static final int BYTE_LENGTH = 1;
76 private static final int INT_LENGTH = 4;
77 private static final int LONG_LENGTH = 8;
78
79 private final ReadableByteChannel channel;
80 private final ForkNodeArguments arguments;
81 private final Map<Segment, MT> messageTypes;
82 private final ConsoleLogger logger;
83
84 protected AbstractStreamDecoder(
85 @Nonnull ReadableByteChannel channel,
86 @Nonnull ForkNodeArguments arguments,
87 @Nonnull Map<Segment, MT> messageTypes) {
88 this.channel = channel;
89 this.arguments = arguments;
90 this.messageTypes = messageTypes;
91 logger = arguments.getConsoleLogger();
92 }
93
94 public abstract M decode(@Nonnull Memento memento) throws MalformedChannelException, IOException;
95
96 @Nonnull
97 protected abstract byte[] getEncodedMagicNumber();
98
99 @Nonnull
100 protected abstract ST[] nextSegmentType(@Nonnull MT messageType);
101
102 @Nonnull
103 protected abstract M toMessage(@Nonnull MT messageType, @Nonnull Memento memento) throws MalformedFrameException;
104
105 @Nonnull
106 protected final ForkNodeArguments getArguments() {
107 return arguments;
108 }
109
110 protected void debugStream(byte[] array, int position, int remaining) {}
111
112 protected MT readMessageType(@Nonnull Memento memento) throws IOException, MalformedFrameException {
113 byte[] header = getEncodedMagicNumber();
114 int readCount = DELIMITER_LENGTH + header.length + DELIMITER_LENGTH + BYTE_LENGTH + DELIMITER_LENGTH;
115 read(memento, readCount);
116 checkHeader(memento);
117 return messageTypes.get(readSegment(memento));
118 }
119
120 @Nonnull
121 @SuppressWarnings("checkstyle:magicnumber")
122 protected Segment readSegment(@Nonnull Memento memento) throws IOException, MalformedFrameException {
123 int readCount = readByte(memento) & 0xff;
124 read(memento, readCount + DELIMITER_LENGTH);
125 ByteBuffer bb = memento.getByteBuffer();
126 Segment segment = new Segment(bb.array(), bb.arrayOffset() + bb.position(), readCount);
127 bb.position(bb.position() + readCount);
128 checkDelimiter(memento);
129 return segment;
130 }
131
132 @Nonnull
133 @SuppressWarnings("checkstyle:magicnumber")
134 protected Charset readCharset(@Nonnull Memento memento) throws IOException, MalformedFrameException {
135 int length = readByte(memento) & 0xff;
136 read(memento, length + DELIMITER_LENGTH);
137 ByteBuffer bb = memento.getByteBuffer();
138 byte[] array = bb.array();
139 int offset = bb.arrayOffset() + bb.position();
140 bb.position(bb.position() + length);
141 boolean isDefaultEncoding = false;
142 if (length == DEFAULT_STREAM_ENCODING_BYTES.length) {
143 isDefaultEncoding = true;
144 for (int i = 0; i < length; i++) {
145 isDefaultEncoding &= DEFAULT_STREAM_ENCODING_BYTES[i] == array[offset + i];
146 }
147 }
148
149 try {
150 Charset charset = isDefaultEncoding
151 ? DEFAULT_STREAM_ENCODING
152 : Charset.forName(new String(array, offset, length, US_ASCII));
153
154 checkDelimiter(memento);
155 return charset;
156 } catch (IllegalArgumentException e) {
157 throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), bb.position());
158 }
159 }
160
161 protected String readString(@Nonnull Memento memento) throws IOException, MalformedFrameException {
162 (memento.getCharBuffer()).clear();
163 int readCount = readInt(memento);
164 if (readCount < 0) {
165 throw new MalformedFrameException(
166 memento.getLine().getPositionByteBuffer(), (memento.getByteBuffer()).position());
167 }
168 read(memento, readCount + DELIMITER_LENGTH);
169
170 final String string;
171 if (readCount == 0) {
172 string = "";
173 } else if (readCount == 1) {
174 read(memento, 1);
175 byte oneChar = memento.getByteBuffer().get();
176 string = oneChar == 0 ? null : String.valueOf((char) oneChar);
177 } else {
178 string = readString(memento, readCount);
179 }
180 read(memento, 1);
181 checkDelimiter(memento);
182 return string;
183 }
184
185 protected Integer readInteger(@Nonnull Memento memento) throws IOException, MalformedFrameException {
186 read(memento, BYTE_LENGTH);
187 boolean isNullObject = memento.getByteBuffer().get() == 0;
188 if (isNullObject) {
189 read(memento, DELIMITER_LENGTH);
190 checkDelimiter(memento);
191 return null;
192 }
193 return readInt(memento);
194 }
195
196 protected byte readByte(@Nonnull Memento memento) throws IOException, MalformedFrameException {
197 read(memento, BYTE_LENGTH + DELIMITER_LENGTH);
198 byte b = memento.getByteBuffer().get();
199 checkDelimiter(memento);
200 return b;
201 }
202
203 protected int readInt(@Nonnull Memento memento) throws IOException, MalformedFrameException {
204 read(memento, INT_LENGTH + DELIMITER_LENGTH);
205 int i = memento.getByteBuffer().getInt();
206 checkDelimiter(memento);
207 return i;
208 }
209
210 protected Long readLong(@Nonnull Memento memento) throws IOException, MalformedFrameException {
211 read(memento, BYTE_LENGTH);
212 boolean isNullObject = memento.getByteBuffer().get() == 0;
213 if (isNullObject) {
214 read(memento, DELIMITER_LENGTH);
215 checkDelimiter(memento);
216 return null;
217 }
218 return readLongPrivate(memento);
219 }
220
221 protected long readLongPrivate(@Nonnull Memento memento) throws IOException, MalformedFrameException {
222 read(memento, LONG_LENGTH + DELIMITER_LENGTH);
223 long num = memento.getByteBuffer().getLong();
224 checkDelimiter(memento);
225 return num;
226 }
227
228 @SuppressWarnings("checkstyle:magicnumber")
229 protected final void checkDelimiter(Memento memento) throws MalformedFrameException {
230 ByteBuffer bb = memento.bb;
231 if ((0xff & bb.get()) != ':') {
232 throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), bb.position());
233 }
234 }
235
236 protected final void checkHeader(Memento memento) throws MalformedFrameException {
237 ByteBuffer bb = memento.bb;
238
239 checkDelimiter(memento);
240
241 int shift = 0;
242 try {
243 byte[] header = getEncodedMagicNumber();
244 byte[] bbArray = bb.array();
245 for (int start = bb.arrayOffset() + bb.position(), length = header.length; shift < length; shift++) {
246 if (bbArray[shift + start] != header[shift]) {
247 throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), bb.position() + shift);
248 }
249 }
250 } finally {
251 bb.position(bb.position() + shift);
252 }
253
254 checkDelimiter(memento);
255 }
256
257 protected void checkArguments(Memento memento, int expectedDataElements) throws MalformedFrameException {
258 if (memento.getData().size() != expectedDataElements) {
259 throw new MalformedFrameException(
260 memento.getLine().getPositionByteBuffer(), (memento.getByteBuffer()).position());
261 }
262 }
263
264 private String readString(@Nonnull final Memento memento, @Nonnegative final int totalBytes)
265 throws IOException, MalformedFrameException {
266 memento.getDecoder().reset();
267 final CharBuffer output = memento.getCharBuffer();
268 (output).clear();
269 final ByteBuffer input = memento.getByteBuffer();
270 final List<String> strings = new ArrayList<>();
271 int countDecodedBytes = 0;
272 for (boolean endOfInput = false; !endOfInput; ) {
273 final int bytesToRead = totalBytes - countDecodedBytes;
274 read(memento, bytesToRead);
275 int bytesToDecode = min(input.remaining(), bytesToRead);
276 final boolean isLastChunk = bytesToDecode == bytesToRead;
277 endOfInput = countDecodedBytes + bytesToDecode >= totalBytes;
278 do {
279 boolean endOfChunk = output.remaining() >= bytesToRead;
280 boolean endOfOutput = isLastChunk && endOfChunk;
281 int readInputBytes = decodeString(
282 memento.getDecoder(),
283 input,
284 output,
285 bytesToDecode,
286 endOfOutput,
287 memento.getLine().getPositionByteBuffer());
288 bytesToDecode -= readInputBytes;
289 countDecodedBytes += readInputBytes;
290 } while (isLastChunk && bytesToDecode > 0 && output.hasRemaining());
291
292 strings.add((output).flip().toString());
293 (output).clear();
294 }
295
296 memento.getDecoder().reset();
297 (output).clear();
298
299 return toString(strings);
300 }
301
302 private static int decodeString(
303 @Nonnull CharsetDecoder decoder,
304 @Nonnull ByteBuffer input,
305 @Nonnull CharBuffer output,
306 @Nonnegative int bytesToDecode,
307 boolean endOfInput,
308 @Nonnegative int errorStreamFrom)
309 throws MalformedFrameException {
310 int limit = (input).limit();
311 (input).limit((input).position() + bytesToDecode);
312
313 CoderResult result = decoder.decode(input, output, endOfInput);
314 if (result.isError() || result.isMalformed()) {
315 throw new MalformedFrameException(errorStreamFrom, (input).position());
316 }
317
318 int decodedBytes = bytesToDecode - input.remaining();
319 (input).limit(limit);
320 return decodedBytes;
321 }
322
323 private static String toString(List<String> strings) {
324 if (strings.size() == 1) {
325 return strings.get(0);
326 }
327 StringBuilder concatenated = new StringBuilder(strings.size() * BUFFER_SIZE);
328 for (String s : strings) {
329 concatenated.append(s);
330 }
331 return concatenated.toString();
332 }
333
334 private void printCorruptedStream(Memento memento) {
335 ByteBuffer bb = memento.getByteBuffer();
336 if (bb.hasRemaining()) {
337 int bytesToWrite = bb.remaining();
338 memento.getLine().write(bb, bb.position(), bytesToWrite);
339 bb.position(bb.position() + bytesToWrite);
340 }
341 }
342
343
344
345
346
347
348 protected final void printRemainingStream(Memento memento) {
349 printCorruptedStream(memento);
350 memento.getLine().printExistingLine();
351 memento.getLine().clear();
352 }
353
354
355
356
357 public static final class Segment {
358 private final byte[] array;
359 private final int fromIndex;
360 private final int length;
361 private final int hashCode;
362
363 public Segment(byte[] array, int fromIndex, int length) {
364 this.array = array;
365 this.fromIndex = fromIndex;
366 this.length = length;
367
368 int hashCode = 0;
369 int i = fromIndex;
370 for (int loops = length >> 1; loops-- != 0; ) {
371 hashCode = 31 * hashCode + array[i++];
372 hashCode = 31 * hashCode + array[i++];
373 }
374 this.hashCode = i == fromIndex + length ? hashCode : 31 * hashCode + array[i];
375 }
376
377 @Override
378 public int hashCode() {
379 return hashCode;
380 }
381
382 @Override
383 public boolean equals(Object obj) {
384 if (!(obj instanceof Segment)) {
385 return false;
386 }
387
388 Segment that = (Segment) obj;
389 if (that.length != length) {
390 return false;
391 }
392
393 for (int i = 0; i < length; i++) {
394 if (that.array[that.fromIndex + i] != array[fromIndex + i]) {
395 return false;
396 }
397 }
398 return true;
399 }
400 }
401
402 protected @Nonnull StreamReadStatus read(@Nonnull Memento memento, int recommendedCount) throws IOException {
403 ByteBuffer buffer = memento.getByteBuffer();
404 if (buffer.remaining() >= recommendedCount && buffer.limit() != 0) {
405 return OVERFLOW;
406 } else {
407 if (buffer.position() != 0 && recommendedCount > buffer.capacity() - buffer.position()) {
408 (buffer.compact()).flip();
409 memento.getLine().setPositionByteBuffer(0);
410 }
411 int mark = buffer.position();
412 buffer.position(buffer.limit());
413 buffer.limit(min(buffer.position() + recommendedCount, buffer.capacity()));
414 return read(buffer, mark, recommendedCount);
415 }
416 }
417
418 private StreamReadStatus read(ByteBuffer buffer, int oldPosition, int recommendedCount) throws IOException {
419 StreamReadStatus readStatus = null;
420 boolean isEnd = false;
421 try {
422 while (!isEnd && buffer.position() - oldPosition < recommendedCount && buffer.position() < buffer.limit()) {
423 isEnd = channel.read(buffer) == -1;
424 }
425 } finally {
426 buffer.limit(buffer.position());
427 buffer.position(oldPosition);
428 int readBytes = buffer.remaining();
429 boolean readComplete = readBytes >= recommendedCount;
430 if (!isEnd || readComplete) {
431 debugStream(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
432 readStatus = readComplete ? OVERFLOW : UNDERFLOW;
433 }
434 }
435
436 if (readStatus == null) {
437 throw new EOFException();
438 } else {
439 return readStatus;
440 }
441 }
442
443
444
445
446 public final class Memento {
447 private CharsetDecoder currentDecoder;
448 private final CharsetDecoder defaultDecoder;
449 private final BufferedStream line = new BufferedStream(32);
450 private final List<Object> data = new ArrayList<>();
451 private final CharBuffer cb = CharBuffer.allocate(BUFFER_SIZE);
452 private final ByteBuffer bb = ByteBuffer.allocate(BUFFER_SIZE);
453
454 public Memento() {
455 defaultDecoder = DEFAULT_STREAM_ENCODING
456 .newDecoder()
457 .onMalformedInput(REPLACE)
458 .onUnmappableCharacter(REPLACE);
459 bb.limit(0);
460 }
461
462 public void reset() {
463 currentDecoder = null;
464 data.clear();
465 }
466
467 public CharsetDecoder getDecoder() {
468 return currentDecoder == null ? defaultDecoder : currentDecoder;
469 }
470
471 public void setCharset(Charset charset) {
472 if (charset.name().equals(defaultDecoder.charset().name())) {
473 currentDecoder = defaultDecoder;
474 } else {
475 currentDecoder = charset.newDecoder().onMalformedInput(REPLACE).onUnmappableCharacter(REPLACE);
476 }
477 }
478
479 public BufferedStream getLine() {
480 return line;
481 }
482
483 public List<Object> getData() {
484 return data;
485 }
486
487 public <T> T ofDataAt(int indexOfData) {
488
489 return (T) data.get(indexOfData);
490 }
491
492 public CharBuffer getCharBuffer() {
493 return cb;
494 }
495
496 public ByteBuffer getByteBuffer() {
497 return bb;
498 }
499 }
500
501
502
503
504 public final class BufferedStream {
505 private byte[] buffer;
506 private int count;
507 private int positionByteBuffer;
508 private boolean isNewLine;
509
510 BufferedStream(int capacity) {
511 this.buffer = new byte[capacity];
512 }
513
514 public int getPositionByteBuffer() {
515 return positionByteBuffer;
516 }
517
518 public void setPositionByteBuffer(int positionByteBuffer) {
519 this.positionByteBuffer = positionByteBuffer;
520 }
521
522 public void write(ByteBuffer bb, int position, int length) {
523 ensureCapacity(length);
524 byte[] array = bb.array();
525 int pos = bb.arrayOffset() + position;
526 while (length-- > 0) {
527 positionByteBuffer++;
528 byte b = array[pos++];
529 if (b == '\r' || b == '\n') {
530 if (!isNewLine) {
531 printExistingLine();
532 count = 0;
533 }
534 isNewLine = true;
535 } else {
536 buffer[count++] = b;
537 isNewLine = false;
538 }
539 }
540 }
541
542 public void clear() {
543 count = 0;
544 }
545
546 @Override
547 public String toString() {
548 return new String(buffer, 0, count, DEFAULT_STREAM_ENCODING);
549 }
550
551 private boolean isEmpty() {
552 return count == 0;
553 }
554
555 private void ensureCapacity(int addCapacity) {
556 int oldCapacity = buffer.length;
557 int exactCapacity = count + addCapacity;
558 if (exactCapacity < 0) {
559 throw new OutOfMemoryError();
560 }
561
562 if (oldCapacity < exactCapacity) {
563 int newCapacity = oldCapacity << 1;
564 buffer = copyOf(buffer, max(newCapacity, exactCapacity));
565 }
566 }
567
568 void printExistingLine() {
569 if (isEmpty()) {
570 return;
571 }
572
573 String s = toString();
574 if (isBlank(s)) {
575 return;
576 }
577
578 if (s.contains(PRINTABLE_JVM_NATIVE_STREAM)) {
579 if (logger.isDebugEnabled()) {
580 logger.debug(s);
581 } else if (logger.isInfoEnabled()) {
582 logger.info(s);
583 } else {
584
585 System.out.println(s);
586 }
587 } else {
588 if (isJvmError(s)) {
589 logger.error(s);
590 } else if (logger.isDebugEnabled()) {
591 logger.debug(s);
592 }
593
594 String msg = "Corrupted channel by directly writing to native stream in forked JVM "
595 + arguments.getForkChannelId() + ".";
596 File dumpFile = arguments.dumpStreamText(msg + " Stream '" + s + "'.");
597 String dumpPath = dumpFile.getAbsolutePath();
598 arguments.logWarningAtEnd(msg + " See FAQ web page and the dump file " + dumpPath);
599 }
600 }
601
602 private boolean isJvmError(String line) {
603 String lineLower = line.toLowerCase();
604 for (String errorPattern : JVM_ERROR_PATTERNS) {
605 if (lineLower.contains(errorPattern)) {
606 return true;
607 }
608 }
609 return false;
610 }
611 }
612
613
614
615
616 public static final class MalformedFrameException extends Exception {
617 private final int readFrom;
618 private final int readTo;
619
620 public MalformedFrameException(int readFrom, int readTo) {
621 this.readFrom = readFrom;
622 this.readTo = readTo;
623 }
624
625 public int readFrom() {
626 return readFrom;
627 }
628
629 public int readTo() {
630 return readTo;
631 }
632
633 public boolean hasValidPositions() {
634 return readFrom != NO_POSITION && readTo != NO_POSITION && readTo - readFrom > 0;
635 }
636 }
637
638
639
640
641
642
643
644
645 public enum StreamReadStatus {
646 UNDERFLOW,
647 OVERFLOW,
648 EOF
649 }
650 }