View Javadoc
1   package org.apache.maven.surefire.api.stream;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
23  import org.apache.maven.surefire.api.fork.ForkNodeArguments;
24  
25  import javax.annotation.Nonnegative;
26  import javax.annotation.Nonnull;
27  import java.io.EOFException;
28  import java.io.File;
29  import java.io.IOException;
30  import java.nio.Buffer;
31  import java.nio.ByteBuffer;
32  import java.nio.CharBuffer;
33  import java.nio.channels.ReadableByteChannel;
34  import java.nio.charset.Charset;
35  import java.nio.charset.CharsetDecoder;
36  import java.nio.charset.CoderResult;
37  import java.util.ArrayList;
38  import java.util.List;
39  import java.util.Map;
40  
41  import static java.lang.Math.max;
42  import static java.lang.Math.min;
43  import static java.nio.charset.CodingErrorAction.REPLACE;
44  import static java.nio.charset.StandardCharsets.US_ASCII;
45  import static java.util.Arrays.copyOf;
46  import static org.apache.maven.surefire.api.booter.Constants.DEFAULT_STREAM_ENCODING;
47  import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.OVERFLOW;
48  import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.UNDERFLOW;
49  import static org.apache.maven.surefire.shared.lang3.StringUtils.isBlank;
50  
51  /**
52   * @param <M> message object
53   * @param <MT> enum describing the meaning of the message
54   * @param <ST> enum for segment type
55   */
56  public abstract class AbstractStreamDecoder<M, MT extends Enum<MT>, ST extends Enum<ST>> implements AutoCloseable
57  {
58      public static final int BUFFER_SIZE = 1024;
59  
60      private static final String PRINTABLE_JVM_NATIVE_STREAM = "Listening for transport dt_socket at address:";
61  
62      private static final String[] JVM_ERROR_PATTERNS = {
63          "could not create the java virtual machine", "error occurred during initialization", // of VM, of boot layer
64          "error:", // general errors
65          "could not reserve enough space", "could not allocate", "unable to allocate", // memory errors
66          "java.lang.module.findexception" // JPMS errors
67      };
68  
69      private static final byte[] DEFAULT_STREAM_ENCODING_BYTES = DEFAULT_STREAM_ENCODING.name().getBytes( US_ASCII );
70  
71      private static final int NO_POSITION = -1;
72      private static final int DELIMITER_LENGTH = 1;
73      private static final int BYTE_LENGTH = 1;
74      private static final int INT_LENGTH = 4;
75      private static final int LONG_LENGTH = 8;
76  
77      private final ReadableByteChannel channel;
78      private final ForkNodeArguments arguments;
79      private final Map<Segment, MT> messageTypes;
80      private final ConsoleLogger logger;
81  
82      protected AbstractStreamDecoder( @Nonnull ReadableByteChannel channel,
83                                       @Nonnull ForkNodeArguments arguments,
84                                       @Nonnull Map<Segment, MT> messageTypes )
85      {
86          this.channel = channel;
87          this.arguments = arguments;
88          this.messageTypes = messageTypes;
89          logger = arguments.getConsoleLogger();
90      }
91  
92      public abstract M decode( @Nonnull Memento memento ) throws MalformedChannelException, IOException;
93  
94      @Nonnull
95      protected abstract byte[] getEncodedMagicNumber();
96  
97      @Nonnull
98      protected abstract ST[] nextSegmentType( @Nonnull MT messageType );
99  
100     @Nonnull
101     protected abstract M toMessage( @Nonnull MT messageType, @Nonnull Memento memento )
102         throws MalformedFrameException;
103 
104     @Nonnull
105     protected final ForkNodeArguments getArguments()
106     {
107         return arguments;
108     }
109 
110     protected void debugStream( byte[] array, int position, int remaining )
111     {
112     }
113 
114     protected MT readMessageType( @Nonnull Memento memento ) throws IOException, MalformedFrameException
115     {
116         byte[] header = getEncodedMagicNumber();
117         int readCount = DELIMITER_LENGTH + header.length + DELIMITER_LENGTH + BYTE_LENGTH + DELIMITER_LENGTH;
118         read( memento, readCount );
119         checkHeader( memento );
120         return messageTypes.get( readSegment( memento ) );
121     }
122 
123     @Nonnull
124     @SuppressWarnings( "checkstyle:magicnumber" )
125     protected Segment readSegment( @Nonnull Memento memento ) throws IOException, MalformedFrameException
126     {
127         int readCount = readByte( memento ) & 0xff;
128         read( memento, readCount + DELIMITER_LENGTH );
129         ByteBuffer bb = memento.getByteBuffer();
130         Segment segment = new Segment( bb.array(), bb.arrayOffset() + ( (Buffer) bb ).position(), readCount );
131         ( (Buffer) bb ).position( ( (Buffer) bb ).position() + readCount );
132         checkDelimiter( memento );
133         return segment;
134     }
135 
136     @Nonnull
137     @SuppressWarnings( "checkstyle:magicnumber" )
138     protected Charset readCharset( @Nonnull Memento memento ) throws IOException, MalformedFrameException
139     {
140         int length = readByte( memento ) & 0xff;
141         read( memento, length + DELIMITER_LENGTH );
142         ByteBuffer bb = memento.getByteBuffer();
143         byte[] array = bb.array();
144         int offset = bb.arrayOffset() + ( (Buffer) bb ).position();
145         ( (Buffer) bb ).position( ( (Buffer) bb ).position() + length );
146         boolean isDefaultEncoding = false;
147         if ( length == DEFAULT_STREAM_ENCODING_BYTES.length )
148         {
149             isDefaultEncoding = true;
150             for ( int i = 0; i < length; i++ )
151             {
152                 isDefaultEncoding &= DEFAULT_STREAM_ENCODING_BYTES[i] == array[offset + i];
153             }
154         }
155 
156         try
157         {
158             Charset charset =
159                 isDefaultEncoding
160                     ? DEFAULT_STREAM_ENCODING
161                     : Charset.forName( new String( array, offset, length, US_ASCII ) );
162 
163             checkDelimiter( memento );
164             return charset;
165         }
166         catch ( IllegalArgumentException e )
167         {
168             throw new MalformedFrameException( memento.getLine().getPositionByteBuffer(), ( (Buffer) bb ).position() );
169         }
170     }
171 
172     protected String readString( @Nonnull Memento memento ) throws IOException, MalformedFrameException
173     {
174         ( (Buffer) memento.getCharBuffer() ).clear();
175         int readCount = readInt( memento );
176         if ( readCount < 0 )
177         {
178             throw new MalformedFrameException( memento.getLine().getPositionByteBuffer(),
179                 ( (Buffer) memento.getByteBuffer() ).position() );
180         }
181         read( memento, readCount + DELIMITER_LENGTH );
182 
183         final String string;
184         if ( readCount == 0 )
185         {
186             string = "";
187         }
188         else if ( readCount == 1 )
189         {
190             read( memento, 1 );
191             byte oneChar = memento.getByteBuffer().get();
192             string = oneChar == 0 ? null : String.valueOf( (char) oneChar );
193         }
194         else
195         {
196             string = readString( memento, readCount );
197         }
198         read( memento, 1 );
199         checkDelimiter( memento );
200         return string;
201     }
202 
203     protected Integer readInteger( @Nonnull Memento memento ) throws IOException, MalformedFrameException
204     {
205         read( memento, BYTE_LENGTH );
206         boolean isNullObject = memento.getByteBuffer().get() == 0;
207         if ( isNullObject )
208         {
209             read( memento, DELIMITER_LENGTH );
210             checkDelimiter( memento );
211             return null;
212         }
213         return readInt( memento );
214     }
215 
216     protected byte readByte( @Nonnull Memento memento ) throws IOException, MalformedFrameException
217     {
218         read( memento, BYTE_LENGTH + DELIMITER_LENGTH );
219         byte b = memento.getByteBuffer().get();
220         checkDelimiter( memento );
221         return b;
222     }
223 
224     protected int readInt( @Nonnull Memento memento ) throws IOException, MalformedFrameException
225     {
226         read( memento, INT_LENGTH + DELIMITER_LENGTH );
227         int i = memento.getByteBuffer().getInt();
228         checkDelimiter( memento );
229         return i;
230     }
231 
232     protected Long readLong( @Nonnull Memento memento ) throws IOException, MalformedFrameException
233     {
234         read( memento, BYTE_LENGTH );
235         boolean isNullObject = memento.getByteBuffer().get() == 0;
236         if ( isNullObject )
237         {
238             read( memento, DELIMITER_LENGTH );
239             checkDelimiter( memento );
240             return null;
241         }
242         return readLongPrivate( memento );
243     }
244 
245     protected long readLongPrivate( @Nonnull Memento memento ) throws IOException, MalformedFrameException
246     {
247         read( memento, LONG_LENGTH + DELIMITER_LENGTH );
248         long num = memento.getByteBuffer().getLong();
249         checkDelimiter( memento );
250         return num;
251     }
252 
253     @SuppressWarnings( "checkstyle:magicnumber" )
254     protected final void checkDelimiter( Memento memento ) throws MalformedFrameException
255     {
256         ByteBuffer bb = memento.bb;
257         if ( ( 0xff & bb.get() ) != ':' )
258         {
259             throw new MalformedFrameException( memento.getLine().getPositionByteBuffer(), ( (Buffer) bb ).position() );
260         }
261     }
262 
263     protected final void checkHeader( Memento memento ) throws MalformedFrameException
264     {
265         ByteBuffer bb = memento.bb;
266 
267         checkDelimiter( memento );
268 
269         int shift = 0;
270         try
271         {
272             byte[] header = getEncodedMagicNumber();
273             byte[] bbArray = bb.array();
274             for ( int start = bb.arrayOffset() + ( (Buffer) bb ).position(), length = header.length;
275                   shift < length; shift++ )
276             {
277                 if ( bbArray[shift + start] != header[shift] )
278                 {
279                     throw new MalformedFrameException( memento.getLine().getPositionByteBuffer(),
280                         ( (Buffer) bb ).position() + shift );
281                 }
282             }
283         }
284         finally
285         {
286             ( (Buffer) bb ).position( ( (Buffer) bb ).position() + shift );
287         }
288 
289         checkDelimiter( memento );
290     }
291 
292     protected void checkArguments( Memento memento, int expectedDataElements )
293         throws MalformedFrameException
294     {
295         if ( memento.getData().size() != expectedDataElements )
296         {
297             throw new MalformedFrameException( memento.getLine().getPositionByteBuffer(),
298                 ( (Buffer) memento.getByteBuffer() ).position() );
299         }
300     }
301 
302     private String readString( @Nonnull final Memento memento, @Nonnegative final int totalBytes )
303         throws IOException, MalformedFrameException
304     {
305         memento.getDecoder().reset();
306         final CharBuffer output = memento.getCharBuffer();
307         ( (Buffer) output ).clear();
308         final ByteBuffer input = memento.getByteBuffer();
309         final List<String> strings = new ArrayList<>();
310         int countDecodedBytes = 0;
311         for ( boolean endOfInput = false; !endOfInput; )
312         {
313             final int bytesToRead = totalBytes - countDecodedBytes;
314             read( memento, bytesToRead );
315             int bytesToDecode = min( input.remaining(), bytesToRead );
316             final boolean isLastChunk = bytesToDecode == bytesToRead;
317             endOfInput = countDecodedBytes + bytesToDecode >= totalBytes;
318             do
319             {
320                 boolean endOfChunk = output.remaining() >= bytesToRead;
321                 boolean endOfOutput = isLastChunk && endOfChunk;
322                 int readInputBytes = decodeString( memento.getDecoder(), input, output, bytesToDecode, endOfOutput,
323                     memento.getLine().getPositionByteBuffer() );
324                 bytesToDecode -= readInputBytes;
325                 countDecodedBytes += readInputBytes;
326             }
327             while ( isLastChunk && bytesToDecode > 0 && output.hasRemaining() );
328 
329             strings.add( ( (Buffer) output ).flip().toString() );
330             ( (Buffer) output ).clear();
331         }
332 
333         memento.getDecoder().reset();
334         ( (Buffer) output ).clear();
335 
336         return toString( strings );
337     }
338 
339     private static int decodeString( @Nonnull CharsetDecoder decoder, @Nonnull ByteBuffer input,
340                                      @Nonnull CharBuffer output, @Nonnegative int bytesToDecode,
341                                      boolean endOfInput, @Nonnegative int errorStreamFrom )
342         throws MalformedFrameException
343     {
344         int limit = ( (Buffer) input ).limit();
345         ( (Buffer) input ).limit( ( (Buffer) input ).position() + bytesToDecode );
346 
347         CoderResult result = decoder.decode( input, output, endOfInput );
348         if ( result.isError() || result.isMalformed() )
349         {
350             throw new MalformedFrameException( errorStreamFrom, ( (Buffer) input ).position() );
351         }
352 
353         int decodedBytes = bytesToDecode - input.remaining();
354         ( (Buffer) input ).limit( limit );
355         return decodedBytes;
356     }
357 
358     private static String toString( List<String> strings )
359     {
360         if ( strings.size() == 1 )
361         {
362             return strings.get( 0 );
363         }
364         StringBuilder concatenated = new StringBuilder( strings.size() * BUFFER_SIZE );
365         for ( String s : strings )
366         {
367             concatenated.append( s );
368         }
369         return concatenated.toString();
370     }
371 
372     private void printCorruptedStream( Memento memento )
373     {
374         ByteBuffer bb = memento.getByteBuffer();
375         if ( bb.hasRemaining() )
376         {
377             int bytesToWrite = bb.remaining();
378             memento.getLine().write( bb, ( (Buffer) bb ).position(), bytesToWrite );
379             ( (Buffer) bb ).position( ( (Buffer) bb ).position() + bytesToWrite );
380         }
381     }
382 
383     /**
384      * Print the last string which has not been finished by a new line character.
385      *
386      * @param memento current memento object
387      */
388     protected final void printRemainingStream( Memento memento )
389     {
390         printCorruptedStream( memento );
391         memento.getLine().printExistingLine();
392         memento.getLine().clear();
393     }
394 
395     /**
396      *
397      */
398     public static final class Segment
399     {
400         private final byte[] array;
401         private final int fromIndex;
402         private final int length;
403         private final int hashCode;
404 
405         public Segment( byte[] array, int fromIndex, int length )
406         {
407             this.array = array;
408             this.fromIndex = fromIndex;
409             this.length = length;
410 
411             int hashCode = 0;
412             int i = fromIndex;
413             for ( int loops = length >> 1; loops-- != 0; )
414             {
415                 hashCode = 31 * hashCode + array[i++];
416                 hashCode = 31 * hashCode + array[i++];
417             }
418             this.hashCode = i == fromIndex + length ? hashCode : 31 * hashCode + array[i];
419         }
420 
421         @Override
422         public int hashCode()
423         {
424             return hashCode;
425         }
426 
427         @Override
428         public boolean equals( Object obj )
429         {
430             if ( !( obj instanceof Segment ) )
431             {
432                 return false;
433             }
434 
435             Segment that = (Segment) obj;
436             if ( that.length != length )
437             {
438                 return false;
439             }
440 
441             for ( int i = 0; i < length; i++ )
442             {
443                 if ( that.array[that.fromIndex + i] != array[fromIndex + i] )
444                 {
445                     return false;
446                 }
447             }
448             return true;
449         }
450     }
451 
452     protected @Nonnull StreamReadStatus read( @Nonnull Memento memento, int recommendedCount ) throws IOException
453     {
454         ByteBuffer buffer = memento.getByteBuffer();
455         if ( buffer.remaining() >= recommendedCount && ( (Buffer) buffer ).limit() != 0 )
456         {
457             return OVERFLOW;
458         }
459         else
460         {
461             if ( ( (Buffer) buffer ).position() != 0
462                 && recommendedCount > buffer.capacity() - ( (Buffer) buffer ).position() )
463             {
464                 ( (Buffer) buffer.compact() ).flip();
465                 memento.getLine().setPositionByteBuffer( 0 );
466             }
467             int mark = ( (Buffer) buffer ).position();
468             ( (Buffer) buffer ).position( ( (Buffer) buffer ).limit() );
469             ( (Buffer) buffer ).limit( min( ( (Buffer) buffer ).position() + recommendedCount, buffer.capacity() ) );
470             return read( buffer, mark, recommendedCount );
471         }
472     }
473 
474     private StreamReadStatus read( ByteBuffer buffer, int oldPosition, int recommendedCount )
475         throws IOException
476     {
477         StreamReadStatus readStatus = null;
478         boolean isEnd = false;
479         try
480         {
481             while ( !isEnd && ( (Buffer) buffer ).position() - oldPosition < recommendedCount
482                 && ( (Buffer) buffer ).position() < ( (Buffer) buffer ).limit() )
483             {
484                 isEnd = channel.read( buffer ) == -1;
485             }
486         }
487         finally
488         {
489             ( (Buffer) buffer ).limit( ( (Buffer) buffer ).position() );
490             ( (Buffer) buffer ).position( oldPosition );
491             int readBytes = buffer.remaining();
492             boolean readComplete = readBytes >= recommendedCount;
493             if ( !isEnd || readComplete )
494             {
495                 debugStream( buffer.array(),
496                     buffer.arrayOffset() + ( (Buffer) buffer ).position(), buffer.remaining() );
497                 readStatus = readComplete ? OVERFLOW : UNDERFLOW;
498             }
499         }
500 
501         if ( readStatus == null )
502         {
503             throw new EOFException();
504         }
505         else
506         {
507             return readStatus;
508         }
509     }
510 
511     /**
512      *
513      */
514     public final class Memento
515     {
516         private CharsetDecoder currentDecoder;
517         private final CharsetDecoder defaultDecoder;
518         private final BufferedStream line = new BufferedStream( 32 );
519         private final List<Object> data = new ArrayList<>();
520         private final CharBuffer cb = CharBuffer.allocate( BUFFER_SIZE );
521         private final ByteBuffer bb = ByteBuffer.allocate( BUFFER_SIZE );
522 
523         public Memento()
524         {
525             defaultDecoder = DEFAULT_STREAM_ENCODING.newDecoder()
526                 .onMalformedInput( REPLACE )
527                 .onUnmappableCharacter( REPLACE );
528             ( (Buffer) bb ).limit( 0 );
529         }
530 
531         public void reset()
532         {
533             currentDecoder = null;
534             data.clear();
535         }
536 
537         public CharsetDecoder getDecoder()
538         {
539             return currentDecoder == null ? defaultDecoder : currentDecoder;
540         }
541 
542         public void setCharset( Charset charset )
543         {
544             if ( charset.name().equals( defaultDecoder.charset().name() ) )
545             {
546                 currentDecoder = defaultDecoder;
547             }
548             else
549             {
550                 currentDecoder = charset.newDecoder()
551                     .onMalformedInput( REPLACE )
552                     .onUnmappableCharacter( REPLACE );
553             }
554         }
555 
556         public BufferedStream getLine()
557         {
558             return line;
559         }
560 
561         public List<Object> getData()
562         {
563             return data;
564         }
565 
566         public <T> T ofDataAt( int indexOfData )
567         {
568             //noinspection unchecked
569             return (T) data.get( indexOfData );
570         }
571 
572         public CharBuffer getCharBuffer()
573         {
574             return cb;
575         }
576 
577         public ByteBuffer getByteBuffer()
578         {
579             return bb;
580         }
581     }
582 
583     /**
584      * This class avoids locking which gains the performance of this decoder.
585      */
586     public final class BufferedStream
587     {
588         private byte[] buffer;
589         private int count;
590         private int positionByteBuffer;
591         private boolean isNewLine;
592 
593         BufferedStream( int capacity )
594         {
595             this.buffer = new byte[capacity];
596         }
597 
598         public int getPositionByteBuffer()
599         {
600             return positionByteBuffer;
601         }
602 
603         public void setPositionByteBuffer( int positionByteBuffer )
604         {
605             this.positionByteBuffer = positionByteBuffer;
606         }
607 
608         public void write( ByteBuffer bb, int position, int length )
609         {
610             ensureCapacity( length );
611             byte[] array = bb.array();
612             int pos = bb.arrayOffset() + position;
613             while ( length-- > 0 )
614             {
615                 positionByteBuffer++;
616                 byte b = array[pos++];
617                 if ( b == '\r' || b == '\n' )
618                 {
619                     if ( !isNewLine )
620                     {
621                         printExistingLine();
622                         count = 0;
623                     }
624                     isNewLine = true;
625                 }
626                 else
627                 {
628                     buffer[count++] = b;
629                     isNewLine = false;
630                 }
631             }
632         }
633 
634         public void clear()
635         {
636             count = 0;
637         }
638 
639         @Override
640         public String toString()
641         {
642             return new String( buffer, 0, count, DEFAULT_STREAM_ENCODING );
643         }
644 
645         private boolean isEmpty()
646         {
647             return count == 0;
648         }
649 
650         private void ensureCapacity( int addCapacity )
651         {
652             int oldCapacity = buffer.length;
653             int exactCapacity = count + addCapacity;
654             if ( exactCapacity < 0 )
655             {
656                 throw new OutOfMemoryError();
657             }
658 
659             if ( oldCapacity < exactCapacity )
660             {
661                 int newCapacity = oldCapacity << 1;
662                 buffer = copyOf( buffer, max( newCapacity, exactCapacity ) );
663             }
664         }
665 
666         void printExistingLine()
667         {
668             if ( isEmpty() )
669             {
670                 return;
671             }
672 
673             String s = toString();
674             if ( isBlank( s ) )
675             {
676                 return;
677             }
678 
679             if ( s.contains( PRINTABLE_JVM_NATIVE_STREAM ) )
680             {
681                 if ( logger.isDebugEnabled() )
682                 {
683                     logger.debug( s );
684                 }
685                 else if ( logger.isInfoEnabled() )
686                 {
687                     logger.info( s );
688                 }
689                 else
690                 {
691                     // In case of debugging forked JVM, see PRINTABLE_JVM_NATIVE_STREAM.
692                     System.out.println( s );
693                 }
694             }
695             else
696             {
697                 if ( isJvmError( s ) )
698                 {
699                     logger.error( s );
700                 }
701                 else if ( logger.isDebugEnabled() )
702                 {
703                     logger.debug( s );
704                 }
705 
706                 String msg = "Corrupted channel by directly writing to native stream in forked JVM "
707                     + arguments.getForkChannelId() + ".";
708                 File dumpFile = arguments.dumpStreamText( msg + " Stream '" + s + "'." );
709                 String dumpPath = dumpFile.getAbsolutePath();
710                 arguments.logWarningAtEnd( msg + " See FAQ web page and the dump file " + dumpPath );
711             }
712         }
713 
714         private boolean isJvmError( String line )
715         {
716             String lineLower = line.toLowerCase();
717             for ( String errorPattern : JVM_ERROR_PATTERNS )
718             {
719                 if ( lineLower.contains( errorPattern ) )
720                 {
721                     return true;
722                 }
723             }
724             return false;
725         }
726     }
727 
728     /**
729      *
730      */
731     public static final class MalformedFrameException extends Exception
732     {
733         private final int readFrom;
734         private final int readTo;
735 
736         public MalformedFrameException( int readFrom, int readTo )
737         {
738             this.readFrom = readFrom;
739             this.readTo = readTo;
740         }
741 
742         public int readFrom()
743         {
744             return readFrom;
745         }
746 
747         public int readTo()
748         {
749             return readTo;
750         }
751 
752         public boolean hasValidPositions()
753         {
754             return readFrom != NO_POSITION && readTo != NO_POSITION && readTo - readFrom > 0;
755         }
756     }
757 
758     /**
759      * Underflow - could not completely read out al bytes in one call.
760      * <br>
761      * Overflow - read all bytes or more
762      * <br>
763      * EOF - end of stream
764      */
765     public enum StreamReadStatus
766     {
767         UNDERFLOW,
768         OVERFLOW,
769         EOF
770     }
771 }