1 package org.apache.maven.surefire.api.stream;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
23 import org.apache.maven.surefire.api.fork.ForkNodeArguments;
24
25 import javax.annotation.Nonnegative;
26 import javax.annotation.Nonnull;
27 import java.io.EOFException;
28 import java.io.File;
29 import java.io.IOException;
30 import java.nio.Buffer;
31 import java.nio.ByteBuffer;
32 import java.nio.CharBuffer;
33 import java.nio.channels.ReadableByteChannel;
34 import java.nio.charset.Charset;
35 import java.nio.charset.CharsetDecoder;
36 import java.nio.charset.CoderResult;
37 import java.util.ArrayList;
38 import java.util.List;
39 import java.util.Map;
40
41 import static java.lang.Math.max;
42 import static java.lang.Math.min;
43 import static java.nio.charset.CodingErrorAction.REPLACE;
44 import static java.nio.charset.StandardCharsets.US_ASCII;
45 import static java.util.Arrays.copyOf;
46 import static org.apache.maven.surefire.api.booter.Constants.DEFAULT_STREAM_ENCODING;
47 import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.OVERFLOW;
48 import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.UNDERFLOW;
49 import static org.apache.maven.surefire.shared.lang3.StringUtils.isBlank;
50
51
52
53
54
55
56 public abstract class AbstractStreamDecoder<M, MT extends Enum<MT>, ST extends Enum<ST>> implements AutoCloseable
57 {
58 public static final int BUFFER_SIZE = 1024;
59
60 private static final String PRINTABLE_JVM_NATIVE_STREAM = "Listening for transport dt_socket at address:";
61
62 private static final String[] JVM_ERROR_PATTERNS = {
63 "could not create the java virtual machine", "error occurred during initialization",
64 "error:",
65 "could not reserve enough space", "could not allocate", "unable to allocate",
66 "java.lang.module.findexception"
67 };
68
69 private static final byte[] DEFAULT_STREAM_ENCODING_BYTES = DEFAULT_STREAM_ENCODING.name().getBytes( US_ASCII );
70
71 private static final int NO_POSITION = -1;
72 private static final int DELIMITER_LENGTH = 1;
73 private static final int BYTE_LENGTH = 1;
74 private static final int INT_LENGTH = 4;
75 private static final int LONG_LENGTH = 8;
76
77 private final ReadableByteChannel channel;
78 private final ForkNodeArguments arguments;
79 private final Map<Segment, MT> messageTypes;
80 private final ConsoleLogger logger;
81
82 protected AbstractStreamDecoder( @Nonnull ReadableByteChannel channel,
83 @Nonnull ForkNodeArguments arguments,
84 @Nonnull Map<Segment, MT> messageTypes )
85 {
86 this.channel = channel;
87 this.arguments = arguments;
88 this.messageTypes = messageTypes;
89 logger = arguments.getConsoleLogger();
90 }
91
92 public abstract M decode( @Nonnull Memento memento ) throws MalformedChannelException, IOException;
93
94 @Nonnull
95 protected abstract byte[] getEncodedMagicNumber();
96
97 @Nonnull
98 protected abstract ST[] nextSegmentType( @Nonnull MT messageType );
99
100 @Nonnull
101 protected abstract M toMessage( @Nonnull MT messageType, @Nonnull Memento memento )
102 throws MalformedFrameException;
103
104 @Nonnull
105 protected final ForkNodeArguments getArguments()
106 {
107 return arguments;
108 }
109
110 protected void debugStream( byte[] array, int position, int remaining )
111 {
112 }
113
114 protected MT readMessageType( @Nonnull Memento memento ) throws IOException, MalformedFrameException
115 {
116 byte[] header = getEncodedMagicNumber();
117 int readCount = DELIMITER_LENGTH + header.length + DELIMITER_LENGTH + BYTE_LENGTH + DELIMITER_LENGTH;
118 read( memento, readCount );
119 checkHeader( memento );
120 return messageTypes.get( readSegment( memento ) );
121 }
122
123 @Nonnull
124 @SuppressWarnings( "checkstyle:magicnumber" )
125 protected Segment readSegment( @Nonnull Memento memento ) throws IOException, MalformedFrameException
126 {
127 int readCount = readByte( memento ) & 0xff;
128 read( memento, readCount + DELIMITER_LENGTH );
129 ByteBuffer bb = memento.getByteBuffer();
130 Segment segment = new Segment( bb.array(), bb.arrayOffset() + ( (Buffer) bb ).position(), readCount );
131 ( (Buffer) bb ).position( ( (Buffer) bb ).position() + readCount );
132 checkDelimiter( memento );
133 return segment;
134 }
135
136 @Nonnull
137 @SuppressWarnings( "checkstyle:magicnumber" )
138 protected Charset readCharset( @Nonnull Memento memento ) throws IOException, MalformedFrameException
139 {
140 int length = readByte( memento ) & 0xff;
141 read( memento, length + DELIMITER_LENGTH );
142 ByteBuffer bb = memento.getByteBuffer();
143 byte[] array = bb.array();
144 int offset = bb.arrayOffset() + ( (Buffer) bb ).position();
145 ( (Buffer) bb ).position( ( (Buffer) bb ).position() + length );
146 boolean isDefaultEncoding = false;
147 if ( length == DEFAULT_STREAM_ENCODING_BYTES.length )
148 {
149 isDefaultEncoding = true;
150 for ( int i = 0; i < length; i++ )
151 {
152 isDefaultEncoding &= DEFAULT_STREAM_ENCODING_BYTES[i] == array[offset + i];
153 }
154 }
155
156 try
157 {
158 Charset charset =
159 isDefaultEncoding
160 ? DEFAULT_STREAM_ENCODING
161 : Charset.forName( new String( array, offset, length, US_ASCII ) );
162
163 checkDelimiter( memento );
164 return charset;
165 }
166 catch ( IllegalArgumentException e )
167 {
168 throw new MalformedFrameException( memento.getLine().getPositionByteBuffer(), ( (Buffer) bb ).position() );
169 }
170 }
171
172 protected String readString( @Nonnull Memento memento ) throws IOException, MalformedFrameException
173 {
174 ( (Buffer) memento.getCharBuffer() ).clear();
175 int readCount = readInt( memento );
176 if ( readCount < 0 )
177 {
178 throw new MalformedFrameException( memento.getLine().getPositionByteBuffer(),
179 ( (Buffer) memento.getByteBuffer() ).position() );
180 }
181 read( memento, readCount + DELIMITER_LENGTH );
182
183 final String string;
184 if ( readCount == 0 )
185 {
186 string = "";
187 }
188 else if ( readCount == 1 )
189 {
190 read( memento, 1 );
191 byte oneChar = memento.getByteBuffer().get();
192 string = oneChar == 0 ? null : String.valueOf( (char) oneChar );
193 }
194 else
195 {
196 string = readString( memento, readCount );
197 }
198 read( memento, 1 );
199 checkDelimiter( memento );
200 return string;
201 }
202
203 protected Integer readInteger( @Nonnull Memento memento ) throws IOException, MalformedFrameException
204 {
205 read( memento, BYTE_LENGTH );
206 boolean isNullObject = memento.getByteBuffer().get() == 0;
207 if ( isNullObject )
208 {
209 read( memento, DELIMITER_LENGTH );
210 checkDelimiter( memento );
211 return null;
212 }
213 return readInt( memento );
214 }
215
216 protected byte readByte( @Nonnull Memento memento ) throws IOException, MalformedFrameException
217 {
218 read( memento, BYTE_LENGTH + DELIMITER_LENGTH );
219 byte b = memento.getByteBuffer().get();
220 checkDelimiter( memento );
221 return b;
222 }
223
224 protected int readInt( @Nonnull Memento memento ) throws IOException, MalformedFrameException
225 {
226 read( memento, INT_LENGTH + DELIMITER_LENGTH );
227 int i = memento.getByteBuffer().getInt();
228 checkDelimiter( memento );
229 return i;
230 }
231
232 protected Long readLong( @Nonnull Memento memento ) throws IOException, MalformedFrameException
233 {
234 read( memento, BYTE_LENGTH );
235 boolean isNullObject = memento.getByteBuffer().get() == 0;
236 if ( isNullObject )
237 {
238 read( memento, DELIMITER_LENGTH );
239 checkDelimiter( memento );
240 return null;
241 }
242 return readLongPrivate( memento );
243 }
244
245 protected long readLongPrivate( @Nonnull Memento memento ) throws IOException, MalformedFrameException
246 {
247 read( memento, LONG_LENGTH + DELIMITER_LENGTH );
248 long num = memento.getByteBuffer().getLong();
249 checkDelimiter( memento );
250 return num;
251 }
252
253 @SuppressWarnings( "checkstyle:magicnumber" )
254 protected final void checkDelimiter( Memento memento ) throws MalformedFrameException
255 {
256 ByteBuffer bb = memento.bb;
257 if ( ( 0xff & bb.get() ) != ':' )
258 {
259 throw new MalformedFrameException( memento.getLine().getPositionByteBuffer(), ( (Buffer) bb ).position() );
260 }
261 }
262
263 protected final void checkHeader( Memento memento ) throws MalformedFrameException
264 {
265 ByteBuffer bb = memento.bb;
266
267 checkDelimiter( memento );
268
269 int shift = 0;
270 try
271 {
272 byte[] header = getEncodedMagicNumber();
273 byte[] bbArray = bb.array();
274 for ( int start = bb.arrayOffset() + ( (Buffer) bb ).position(), length = header.length;
275 shift < length; shift++ )
276 {
277 if ( bbArray[shift + start] != header[shift] )
278 {
279 throw new MalformedFrameException( memento.getLine().getPositionByteBuffer(),
280 ( (Buffer) bb ).position() + shift );
281 }
282 }
283 }
284 finally
285 {
286 ( (Buffer) bb ).position( ( (Buffer) bb ).position() + shift );
287 }
288
289 checkDelimiter( memento );
290 }
291
292 protected void checkArguments( Memento memento, int expectedDataElements )
293 throws MalformedFrameException
294 {
295 if ( memento.getData().size() != expectedDataElements )
296 {
297 throw new MalformedFrameException( memento.getLine().getPositionByteBuffer(),
298 ( (Buffer) memento.getByteBuffer() ).position() );
299 }
300 }
301
302 private String readString( @Nonnull final Memento memento, @Nonnegative final int totalBytes )
303 throws IOException, MalformedFrameException
304 {
305 memento.getDecoder().reset();
306 final CharBuffer output = memento.getCharBuffer();
307 ( (Buffer) output ).clear();
308 final ByteBuffer input = memento.getByteBuffer();
309 final List<String> strings = new ArrayList<>();
310 int countDecodedBytes = 0;
311 for ( boolean endOfInput = false; !endOfInput; )
312 {
313 final int bytesToRead = totalBytes - countDecodedBytes;
314 read( memento, bytesToRead );
315 int bytesToDecode = min( input.remaining(), bytesToRead );
316 final boolean isLastChunk = bytesToDecode == bytesToRead;
317 endOfInput = countDecodedBytes + bytesToDecode >= totalBytes;
318 do
319 {
320 boolean endOfChunk = output.remaining() >= bytesToRead;
321 boolean endOfOutput = isLastChunk && endOfChunk;
322 int readInputBytes = decodeString( memento.getDecoder(), input, output, bytesToDecode, endOfOutput,
323 memento.getLine().getPositionByteBuffer() );
324 bytesToDecode -= readInputBytes;
325 countDecodedBytes += readInputBytes;
326 }
327 while ( isLastChunk && bytesToDecode > 0 && output.hasRemaining() );
328
329 strings.add( ( (Buffer) output ).flip().toString() );
330 ( (Buffer) output ).clear();
331 }
332
333 memento.getDecoder().reset();
334 ( (Buffer) output ).clear();
335
336 return toString( strings );
337 }
338
339 private static int decodeString( @Nonnull CharsetDecoder decoder, @Nonnull ByteBuffer input,
340 @Nonnull CharBuffer output, @Nonnegative int bytesToDecode,
341 boolean endOfInput, @Nonnegative int errorStreamFrom )
342 throws MalformedFrameException
343 {
344 int limit = ( (Buffer) input ).limit();
345 ( (Buffer) input ).limit( ( (Buffer) input ).position() + bytesToDecode );
346
347 CoderResult result = decoder.decode( input, output, endOfInput );
348 if ( result.isError() || result.isMalformed() )
349 {
350 throw new MalformedFrameException( errorStreamFrom, ( (Buffer) input ).position() );
351 }
352
353 int decodedBytes = bytesToDecode - input.remaining();
354 ( (Buffer) input ).limit( limit );
355 return decodedBytes;
356 }
357
358 private static String toString( List<String> strings )
359 {
360 if ( strings.size() == 1 )
361 {
362 return strings.get( 0 );
363 }
364 StringBuilder concatenated = new StringBuilder( strings.size() * BUFFER_SIZE );
365 for ( String s : strings )
366 {
367 concatenated.append( s );
368 }
369 return concatenated.toString();
370 }
371
372 private void printCorruptedStream( Memento memento )
373 {
374 ByteBuffer bb = memento.getByteBuffer();
375 if ( bb.hasRemaining() )
376 {
377 int bytesToWrite = bb.remaining();
378 memento.getLine().write( bb, ( (Buffer) bb ).position(), bytesToWrite );
379 ( (Buffer) bb ).position( ( (Buffer) bb ).position() + bytesToWrite );
380 }
381 }
382
383
384
385
386
387
388 protected final void printRemainingStream( Memento memento )
389 {
390 printCorruptedStream( memento );
391 memento.getLine().printExistingLine();
392 memento.getLine().clear();
393 }
394
395
396
397
398 public static final class Segment
399 {
400 private final byte[] array;
401 private final int fromIndex;
402 private final int length;
403 private final int hashCode;
404
405 public Segment( byte[] array, int fromIndex, int length )
406 {
407 this.array = array;
408 this.fromIndex = fromIndex;
409 this.length = length;
410
411 int hashCode = 0;
412 int i = fromIndex;
413 for ( int loops = length >> 1; loops-- != 0; )
414 {
415 hashCode = 31 * hashCode + array[i++];
416 hashCode = 31 * hashCode + array[i++];
417 }
418 this.hashCode = i == fromIndex + length ? hashCode : 31 * hashCode + array[i];
419 }
420
421 @Override
422 public int hashCode()
423 {
424 return hashCode;
425 }
426
427 @Override
428 public boolean equals( Object obj )
429 {
430 if ( !( obj instanceof Segment ) )
431 {
432 return false;
433 }
434
435 Segment that = (Segment) obj;
436 if ( that.length != length )
437 {
438 return false;
439 }
440
441 for ( int i = 0; i < length; i++ )
442 {
443 if ( that.array[that.fromIndex + i] != array[fromIndex + i] )
444 {
445 return false;
446 }
447 }
448 return true;
449 }
450 }
451
452 protected @Nonnull StreamReadStatus read( @Nonnull Memento memento, int recommendedCount ) throws IOException
453 {
454 ByteBuffer buffer = memento.getByteBuffer();
455 if ( buffer.remaining() >= recommendedCount && ( (Buffer) buffer ).limit() != 0 )
456 {
457 return OVERFLOW;
458 }
459 else
460 {
461 if ( ( (Buffer) buffer ).position() != 0
462 && recommendedCount > buffer.capacity() - ( (Buffer) buffer ).position() )
463 {
464 ( (Buffer) buffer.compact() ).flip();
465 memento.getLine().setPositionByteBuffer( 0 );
466 }
467 int mark = ( (Buffer) buffer ).position();
468 ( (Buffer) buffer ).position( ( (Buffer) buffer ).limit() );
469 ( (Buffer) buffer ).limit( min( ( (Buffer) buffer ).position() + recommendedCount, buffer.capacity() ) );
470 return read( buffer, mark, recommendedCount );
471 }
472 }
473
474 private StreamReadStatus read( ByteBuffer buffer, int oldPosition, int recommendedCount )
475 throws IOException
476 {
477 StreamReadStatus readStatus = null;
478 boolean isEnd = false;
479 try
480 {
481 while ( !isEnd && ( (Buffer) buffer ).position() - oldPosition < recommendedCount
482 && ( (Buffer) buffer ).position() < ( (Buffer) buffer ).limit() )
483 {
484 isEnd = channel.read( buffer ) == -1;
485 }
486 }
487 finally
488 {
489 ( (Buffer) buffer ).limit( ( (Buffer) buffer ).position() );
490 ( (Buffer) buffer ).position( oldPosition );
491 int readBytes = buffer.remaining();
492 boolean readComplete = readBytes >= recommendedCount;
493 if ( !isEnd || readComplete )
494 {
495 debugStream( buffer.array(),
496 buffer.arrayOffset() + ( (Buffer) buffer ).position(), buffer.remaining() );
497 readStatus = readComplete ? OVERFLOW : UNDERFLOW;
498 }
499 }
500
501 if ( readStatus == null )
502 {
503 throw new EOFException();
504 }
505 else
506 {
507 return readStatus;
508 }
509 }
510
511
512
513
514 public final class Memento
515 {
516 private CharsetDecoder currentDecoder;
517 private final CharsetDecoder defaultDecoder;
518 private final BufferedStream line = new BufferedStream( 32 );
519 private final List<Object> data = new ArrayList<>();
520 private final CharBuffer cb = CharBuffer.allocate( BUFFER_SIZE );
521 private final ByteBuffer bb = ByteBuffer.allocate( BUFFER_SIZE );
522
523 public Memento()
524 {
525 defaultDecoder = DEFAULT_STREAM_ENCODING.newDecoder()
526 .onMalformedInput( REPLACE )
527 .onUnmappableCharacter( REPLACE );
528 ( (Buffer) bb ).limit( 0 );
529 }
530
531 public void reset()
532 {
533 currentDecoder = null;
534 data.clear();
535 }
536
537 public CharsetDecoder getDecoder()
538 {
539 return currentDecoder == null ? defaultDecoder : currentDecoder;
540 }
541
542 public void setCharset( Charset charset )
543 {
544 if ( charset.name().equals( defaultDecoder.charset().name() ) )
545 {
546 currentDecoder = defaultDecoder;
547 }
548 else
549 {
550 currentDecoder = charset.newDecoder()
551 .onMalformedInput( REPLACE )
552 .onUnmappableCharacter( REPLACE );
553 }
554 }
555
556 public BufferedStream getLine()
557 {
558 return line;
559 }
560
561 public List<Object> getData()
562 {
563 return data;
564 }
565
566 public <T> T ofDataAt( int indexOfData )
567 {
568
569 return (T) data.get( indexOfData );
570 }
571
572 public CharBuffer getCharBuffer()
573 {
574 return cb;
575 }
576
577 public ByteBuffer getByteBuffer()
578 {
579 return bb;
580 }
581 }
582
583
584
585
586 public final class BufferedStream
587 {
588 private byte[] buffer;
589 private int count;
590 private int positionByteBuffer;
591 private boolean isNewLine;
592
593 BufferedStream( int capacity )
594 {
595 this.buffer = new byte[capacity];
596 }
597
598 public int getPositionByteBuffer()
599 {
600 return positionByteBuffer;
601 }
602
603 public void setPositionByteBuffer( int positionByteBuffer )
604 {
605 this.positionByteBuffer = positionByteBuffer;
606 }
607
608 public void write( ByteBuffer bb, int position, int length )
609 {
610 ensureCapacity( length );
611 byte[] array = bb.array();
612 int pos = bb.arrayOffset() + position;
613 while ( length-- > 0 )
614 {
615 positionByteBuffer++;
616 byte b = array[pos++];
617 if ( b == '\r' || b == '\n' )
618 {
619 if ( !isNewLine )
620 {
621 printExistingLine();
622 count = 0;
623 }
624 isNewLine = true;
625 }
626 else
627 {
628 buffer[count++] = b;
629 isNewLine = false;
630 }
631 }
632 }
633
634 public void clear()
635 {
636 count = 0;
637 }
638
639 @Override
640 public String toString()
641 {
642 return new String( buffer, 0, count, DEFAULT_STREAM_ENCODING );
643 }
644
645 private boolean isEmpty()
646 {
647 return count == 0;
648 }
649
650 private void ensureCapacity( int addCapacity )
651 {
652 int oldCapacity = buffer.length;
653 int exactCapacity = count + addCapacity;
654 if ( exactCapacity < 0 )
655 {
656 throw new OutOfMemoryError();
657 }
658
659 if ( oldCapacity < exactCapacity )
660 {
661 int newCapacity = oldCapacity << 1;
662 buffer = copyOf( buffer, max( newCapacity, exactCapacity ) );
663 }
664 }
665
666 void printExistingLine()
667 {
668 if ( isEmpty() )
669 {
670 return;
671 }
672
673 String s = toString();
674 if ( isBlank( s ) )
675 {
676 return;
677 }
678
679 if ( s.contains( PRINTABLE_JVM_NATIVE_STREAM ) )
680 {
681 if ( logger.isDebugEnabled() )
682 {
683 logger.debug( s );
684 }
685 else if ( logger.isInfoEnabled() )
686 {
687 logger.info( s );
688 }
689 else
690 {
691
692 System.out.println( s );
693 }
694 }
695 else
696 {
697 if ( isJvmError( s ) )
698 {
699 logger.error( s );
700 }
701 else if ( logger.isDebugEnabled() )
702 {
703 logger.debug( s );
704 }
705
706 String msg = "Corrupted channel by directly writing to native stream in forked JVM "
707 + arguments.getForkChannelId() + ".";
708 File dumpFile = arguments.dumpStreamText( msg + " Stream '" + s + "'." );
709 String dumpPath = dumpFile.getAbsolutePath();
710 arguments.logWarningAtEnd( msg + " See FAQ web page and the dump file " + dumpPath );
711 }
712 }
713
714 private boolean isJvmError( String line )
715 {
716 String lineLower = line.toLowerCase();
717 for ( String errorPattern : JVM_ERROR_PATTERNS )
718 {
719 if ( lineLower.contains( errorPattern ) )
720 {
721 return true;
722 }
723 }
724 return false;
725 }
726 }
727
728
729
730
731 public static final class MalformedFrameException extends Exception
732 {
733 private final int readFrom;
734 private final int readTo;
735
736 public MalformedFrameException( int readFrom, int readTo )
737 {
738 this.readFrom = readFrom;
739 this.readTo = readTo;
740 }
741
742 public int readFrom()
743 {
744 return readFrom;
745 }
746
747 public int readTo()
748 {
749 return readTo;
750 }
751
752 public boolean hasValidPositions()
753 {
754 return readFrom != NO_POSITION && readTo != NO_POSITION && readTo - readFrom > 0;
755 }
756 }
757
758
759
760
761
762
763
764
765 public enum StreamReadStatus
766 {
767 UNDERFLOW,
768 OVERFLOW,
769 EOF
770 }
771 }