View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.surefire.api.stream;
20  
21  import javax.annotation.Nonnegative;
22  import javax.annotation.Nonnull;
23  
24  import java.io.EOFException;
25  import java.io.File;
26  import java.io.IOException;
27  import java.nio.ByteBuffer;
28  import java.nio.CharBuffer;
29  import java.nio.channels.ReadableByteChannel;
30  import java.nio.charset.Charset;
31  import java.nio.charset.CharsetDecoder;
32  import java.nio.charset.CoderResult;
33  import java.util.ArrayList;
34  import java.util.List;
35  import java.util.Map;
36  
37  import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
38  import org.apache.maven.surefire.api.fork.ForkNodeArguments;
39  
40  import static java.lang.Math.max;
41  import static java.lang.Math.min;
42  import static java.nio.charset.CodingErrorAction.REPLACE;
43  import static java.nio.charset.StandardCharsets.US_ASCII;
44  import static java.util.Arrays.copyOf;
45  import static org.apache.maven.surefire.api.booter.Constants.DEFAULT_STREAM_ENCODING;
46  import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.OVERFLOW;
47  import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.UNDERFLOW;
48  import static org.apache.maven.surefire.shared.lang3.StringUtils.isBlank;
49  
50  /**
51   * @param <M> message object
52   * @param <MT> enum describing the meaning of the message
53   * @param <ST> enum for segment type
54   */
55  public abstract class AbstractStreamDecoder<M, MT extends Enum<MT>, ST extends Enum<ST>> implements AutoCloseable {
56      public static final int BUFFER_SIZE = 1024;
57  
58      private static final String PRINTABLE_JVM_NATIVE_STREAM = "Listening for transport dt_socket at address:";
59  
60      private static final String[] JVM_ERROR_PATTERNS = {
61          "could not create the java virtual machine",
62          "error occurred during initialization", // of VM, of boot layer
63          "error:", // general errors
64          "could not reserve enough space",
65          "could not allocate",
66          "unable to allocate", // memory errors
67          "java.lang.module.findexception" // JPMS errors
68      };
69  
70      private static final byte[] DEFAULT_STREAM_ENCODING_BYTES =
71              DEFAULT_STREAM_ENCODING.name().getBytes(US_ASCII);
72  
73      private static final int NO_POSITION = -1;
74      private static final int DELIMITER_LENGTH = 1;
75      private static final int BYTE_LENGTH = 1;
76      private static final int INT_LENGTH = 4;
77      private static final int LONG_LENGTH = 8;
78  
79      private final ReadableByteChannel channel;
80      private final ForkNodeArguments arguments;
81      private final Map<Segment, MT> messageTypes;
82      private final ConsoleLogger logger;
83  
84      protected AbstractStreamDecoder(
85              @Nonnull ReadableByteChannel channel,
86              @Nonnull ForkNodeArguments arguments,
87              @Nonnull Map<Segment, MT> messageTypes) {
88          this.channel = channel;
89          this.arguments = arguments;
90          this.messageTypes = messageTypes;
91          logger = arguments.getConsoleLogger();
92      }
93  
94      public abstract M decode(@Nonnull Memento memento) throws MalformedChannelException, IOException;
95  
96      @Nonnull
97      protected abstract byte[] getEncodedMagicNumber();
98  
99      @Nonnull
100     protected abstract ST[] nextSegmentType(@Nonnull MT messageType);
101 
102     @Nonnull
103     protected abstract M toMessage(@Nonnull MT messageType, @Nonnull Memento memento) throws MalformedFrameException;
104 
105     @Nonnull
106     protected final ForkNodeArguments getArguments() {
107         return arguments;
108     }
109 
110     protected void debugStream(byte[] array, int position, int remaining) {}
111 
112     protected MT readMessageType(@Nonnull Memento memento) throws IOException, MalformedFrameException {
113         byte[] header = getEncodedMagicNumber();
114         int readCount = DELIMITER_LENGTH + header.length + DELIMITER_LENGTH + BYTE_LENGTH + DELIMITER_LENGTH;
115         read(memento, readCount);
116         checkHeader(memento);
117         return messageTypes.get(readSegment(memento));
118     }
119 
120     @Nonnull
121     @SuppressWarnings("checkstyle:magicnumber")
122     protected Segment readSegment(@Nonnull Memento memento) throws IOException, MalformedFrameException {
123         int readCount = readByte(memento) & 0xff;
124         read(memento, readCount + DELIMITER_LENGTH);
125         ByteBuffer bb = memento.getByteBuffer();
126         Segment segment = new Segment(bb.array(), bb.arrayOffset() + bb.position(), readCount);
127         bb.position(bb.position() + readCount);
128         checkDelimiter(memento);
129         return segment;
130     }
131 
132     @Nonnull
133     @SuppressWarnings("checkstyle:magicnumber")
134     protected Charset readCharset(@Nonnull Memento memento) throws IOException, MalformedFrameException {
135         int length = readByte(memento) & 0xff;
136         read(memento, length + DELIMITER_LENGTH);
137         ByteBuffer bb = memento.getByteBuffer();
138         byte[] array = bb.array();
139         int offset = bb.arrayOffset() + bb.position();
140         bb.position(bb.position() + length);
141         boolean isDefaultEncoding = false;
142         if (length == DEFAULT_STREAM_ENCODING_BYTES.length) {
143             isDefaultEncoding = true;
144             for (int i = 0; i < length; i++) {
145                 isDefaultEncoding &= DEFAULT_STREAM_ENCODING_BYTES[i] == array[offset + i];
146             }
147         }
148 
149         try {
150             Charset charset = isDefaultEncoding
151                     ? DEFAULT_STREAM_ENCODING
152                     : Charset.forName(new String(array, offset, length, US_ASCII));
153 
154             checkDelimiter(memento);
155             return charset;
156         } catch (IllegalArgumentException e) {
157             throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), bb.position());
158         }
159     }
160 
161     protected String readString(@Nonnull Memento memento) throws IOException, MalformedFrameException {
162         (memento.getCharBuffer()).clear();
163         int readCount = readInt(memento);
164         if (readCount < 0) {
165             throw new MalformedFrameException(
166                     memento.getLine().getPositionByteBuffer(), (memento.getByteBuffer()).position());
167         }
168         read(memento, readCount + DELIMITER_LENGTH);
169 
170         final String string;
171         if (readCount == 0) {
172             string = "";
173         } else if (readCount == 1) {
174             read(memento, 1);
175             byte oneChar = memento.getByteBuffer().get();
176             string = oneChar == 0 ? null : String.valueOf((char) oneChar);
177         } else {
178             string = readString(memento, readCount);
179         }
180         read(memento, 1);
181         checkDelimiter(memento);
182         return string;
183     }
184 
185     protected Integer readInteger(@Nonnull Memento memento) throws IOException, MalformedFrameException {
186         read(memento, BYTE_LENGTH);
187         boolean isNullObject = memento.getByteBuffer().get() == 0;
188         if (isNullObject) {
189             read(memento, DELIMITER_LENGTH);
190             checkDelimiter(memento);
191             return null;
192         }
193         return readInt(memento);
194     }
195 
196     protected byte readByte(@Nonnull Memento memento) throws IOException, MalformedFrameException {
197         read(memento, BYTE_LENGTH + DELIMITER_LENGTH);
198         byte b = memento.getByteBuffer().get();
199         checkDelimiter(memento);
200         return b;
201     }
202 
203     protected int readInt(@Nonnull Memento memento) throws IOException, MalformedFrameException {
204         read(memento, INT_LENGTH + DELIMITER_LENGTH);
205         int i = memento.getByteBuffer().getInt();
206         checkDelimiter(memento);
207         return i;
208     }
209 
210     protected Long readLong(@Nonnull Memento memento) throws IOException, MalformedFrameException {
211         read(memento, BYTE_LENGTH);
212         boolean isNullObject = memento.getByteBuffer().get() == 0;
213         if (isNullObject) {
214             read(memento, DELIMITER_LENGTH);
215             checkDelimiter(memento);
216             return null;
217         }
218         return readLongPrivate(memento);
219     }
220 
221     protected long readLongPrivate(@Nonnull Memento memento) throws IOException, MalformedFrameException {
222         read(memento, LONG_LENGTH + DELIMITER_LENGTH);
223         long num = memento.getByteBuffer().getLong();
224         checkDelimiter(memento);
225         return num;
226     }
227 
228     @SuppressWarnings("checkstyle:magicnumber")
229     protected final void checkDelimiter(Memento memento) throws MalformedFrameException {
230         ByteBuffer bb = memento.bb;
231         if ((0xff & bb.get()) != ':') {
232             throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), bb.position());
233         }
234     }
235 
236     protected final void checkHeader(Memento memento) throws MalformedFrameException {
237         ByteBuffer bb = memento.bb;
238 
239         checkDelimiter(memento);
240 
241         int shift = 0;
242         try {
243             byte[] header = getEncodedMagicNumber();
244             byte[] bbArray = bb.array();
245             for (int start = bb.arrayOffset() + bb.position(), length = header.length; shift < length; shift++) {
246                 if (bbArray[shift + start] != header[shift]) {
247                     throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), bb.position() + shift);
248                 }
249             }
250         } finally {
251             bb.position(bb.position() + shift);
252         }
253 
254         checkDelimiter(memento);
255     }
256 
257     protected void checkArguments(Memento memento, int expectedDataElements) throws MalformedFrameException {
258         if (memento.getData().size() != expectedDataElements) {
259             throw new MalformedFrameException(
260                     memento.getLine().getPositionByteBuffer(), (memento.getByteBuffer()).position());
261         }
262     }
263 
264     private String readString(@Nonnull final Memento memento, @Nonnegative final int totalBytes)
265             throws IOException, MalformedFrameException {
266         memento.getDecoder().reset();
267         final CharBuffer output = memento.getCharBuffer();
268         (output).clear();
269         final ByteBuffer input = memento.getByteBuffer();
270         final List<String> strings = new ArrayList<>();
271         int countDecodedBytes = 0;
272         for (boolean endOfInput = false; !endOfInput; ) {
273             final int bytesToRead = totalBytes - countDecodedBytes;
274             read(memento, bytesToRead);
275             int bytesToDecode = min(input.remaining(), bytesToRead);
276             final boolean isLastChunk = bytesToDecode == bytesToRead;
277             endOfInput = countDecodedBytes + bytesToDecode >= totalBytes;
278             do {
279                 boolean endOfChunk = output.remaining() >= bytesToRead;
280                 boolean endOfOutput = isLastChunk && endOfChunk;
281                 int readInputBytes = decodeString(
282                         memento.getDecoder(),
283                         input,
284                         output,
285                         bytesToDecode,
286                         endOfOutput,
287                         memento.getLine().getPositionByteBuffer());
288                 bytesToDecode -= readInputBytes;
289                 countDecodedBytes += readInputBytes;
290             } while (isLastChunk && bytesToDecode > 0 && output.hasRemaining());
291 
292             strings.add((output).flip().toString());
293             (output).clear();
294         }
295 
296         memento.getDecoder().reset();
297         (output).clear();
298 
299         return toString(strings);
300     }
301 
302     private static int decodeString(
303             @Nonnull CharsetDecoder decoder,
304             @Nonnull ByteBuffer input,
305             @Nonnull CharBuffer output,
306             @Nonnegative int bytesToDecode,
307             boolean endOfInput,
308             @Nonnegative int errorStreamFrom)
309             throws MalformedFrameException {
310         int limit = (input).limit();
311         (input).limit((input).position() + bytesToDecode);
312 
313         CoderResult result = decoder.decode(input, output, endOfInput);
314         if (result.isError() || result.isMalformed()) {
315             throw new MalformedFrameException(errorStreamFrom, (input).position());
316         }
317 
318         int decodedBytes = bytesToDecode - input.remaining();
319         (input).limit(limit);
320         return decodedBytes;
321     }
322 
323     private static String toString(List<String> strings) {
324         if (strings.size() == 1) {
325             return strings.get(0);
326         }
327         StringBuilder concatenated = new StringBuilder(strings.size() * BUFFER_SIZE);
328         for (String s : strings) {
329             concatenated.append(s);
330         }
331         return concatenated.toString();
332     }
333 
334     private void printCorruptedStream(Memento memento) {
335         ByteBuffer bb = memento.getByteBuffer();
336         if (bb.hasRemaining()) {
337             int bytesToWrite = bb.remaining();
338             memento.getLine().write(bb, bb.position(), bytesToWrite);
339             bb.position(bb.position() + bytesToWrite);
340         }
341     }
342 
343     /**
344      * Print the last string which has not been finished by a new line character.
345      *
346      * @param memento current memento object
347      */
348     protected final void printRemainingStream(Memento memento) {
349         printCorruptedStream(memento);
350         memento.getLine().printExistingLine();
351         memento.getLine().clear();
352     }
353 
354     public static final class Segment {
355         private final byte[] array;
356         private final int fromIndex;
357         private final int length;
358         private final int hashCode;
359 
360         public Segment(byte[] array, int fromIndex, int length) {
361             this.array = array;
362             this.fromIndex = fromIndex;
363             this.length = length;
364 
365             int hashCode = 0;
366             int i = fromIndex;
367             for (int loops = length >> 1; loops-- != 0; ) {
368                 hashCode = 31 * hashCode + array[i++];
369                 hashCode = 31 * hashCode + array[i++];
370             }
371             this.hashCode = i == fromIndex + length ? hashCode : 31 * hashCode + array[i];
372         }
373 
374         @Override
375         public int hashCode() {
376             return hashCode;
377         }
378 
379         @Override
380         public boolean equals(Object obj) {
381             if (!(obj instanceof Segment)) {
382                 return false;
383             }
384 
385             Segment that = (Segment) obj;
386             if (that.length != length) {
387                 return false;
388             }
389 
390             for (int i = 0; i < length; i++) {
391                 if (that.array[that.fromIndex + i] != array[fromIndex + i]) {
392                     return false;
393                 }
394             }
395             return true;
396         }
397     }
398 
399     protected @Nonnull StreamReadStatus read(@Nonnull Memento memento, int recommendedCount) throws IOException {
400         ByteBuffer buffer = memento.getByteBuffer();
401         if (buffer.remaining() >= recommendedCount && buffer.limit() != 0) {
402             return OVERFLOW;
403         } else {
404             if (buffer.position() != 0 && recommendedCount > buffer.capacity() - buffer.position()) {
405                 (buffer.compact()).flip();
406                 memento.getLine().setPositionByteBuffer(0);
407             }
408             int mark = buffer.position();
409             buffer.position(buffer.limit());
410             buffer.limit(min(buffer.position() + recommendedCount, buffer.capacity()));
411             return read(buffer, mark, recommendedCount);
412         }
413     }
414 
415     private StreamReadStatus read(ByteBuffer buffer, int oldPosition, int recommendedCount) throws IOException {
416         StreamReadStatus readStatus = null;
417         boolean isEnd = false;
418         try {
419             while (!isEnd && buffer.position() - oldPosition < recommendedCount && buffer.position() < buffer.limit()) {
420                 isEnd = channel.read(buffer) == -1;
421             }
422         } finally {
423             buffer.limit(buffer.position());
424             buffer.position(oldPosition);
425             int readBytes = buffer.remaining();
426             boolean readComplete = readBytes >= recommendedCount;
427             if (!isEnd || readComplete) {
428                 debugStream(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
429                 readStatus = readComplete ? OVERFLOW : UNDERFLOW;
430             }
431         }
432 
433         if (readStatus == null) {
434             throw new EOFException();
435         } else {
436             return readStatus;
437         }
438     }
439 
440     public final class Memento {
441         private CharsetDecoder currentDecoder;
442         private final CharsetDecoder defaultDecoder;
443         private final BufferedStream line = new BufferedStream(32);
444         private final List<Object> data = new ArrayList<>();
445         private final CharBuffer cb = CharBuffer.allocate(BUFFER_SIZE);
446         private final ByteBuffer bb = ByteBuffer.allocate(BUFFER_SIZE);
447 
448         public Memento() {
449             defaultDecoder = DEFAULT_STREAM_ENCODING
450                     .newDecoder()
451                     .onMalformedInput(REPLACE)
452                     .onUnmappableCharacter(REPLACE);
453             bb.limit(0);
454         }
455 
456         public void reset() {
457             currentDecoder = null;
458             data.clear();
459         }
460 
461         public CharsetDecoder getDecoder() {
462             return currentDecoder == null ? defaultDecoder : currentDecoder;
463         }
464 
465         public void setCharset(Charset charset) {
466             if (charset.name().equals(defaultDecoder.charset().name())) {
467                 currentDecoder = defaultDecoder;
468             } else {
469                 currentDecoder = charset.newDecoder().onMalformedInput(REPLACE).onUnmappableCharacter(REPLACE);
470             }
471         }
472 
473         public BufferedStream getLine() {
474             return line;
475         }
476 
477         public List<Object> getData() {
478             return data;
479         }
480 
481         public <T> T ofDataAt(int indexOfData) {
482             //noinspection unchecked
483             return (T) data.get(indexOfData);
484         }
485 
486         public CharBuffer getCharBuffer() {
487             return cb;
488         }
489 
490         public ByteBuffer getByteBuffer() {
491             return bb;
492         }
493     }
494 
495     /**
496      * This class avoids locking which gains the performance of this decoder.
497      */
498     public final class BufferedStream {
499         private byte[] buffer;
500         private int count;
501         private int positionByteBuffer;
502         private boolean isNewLine;
503 
504         BufferedStream(int capacity) {
505             this.buffer = new byte[capacity];
506         }
507 
508         public int getPositionByteBuffer() {
509             return positionByteBuffer;
510         }
511 
512         public void setPositionByteBuffer(int positionByteBuffer) {
513             this.positionByteBuffer = positionByteBuffer;
514         }
515 
516         public void write(ByteBuffer bb, int position, int length) {
517             ensureCapacity(length);
518             byte[] array = bb.array();
519             int pos = bb.arrayOffset() + position;
520             while (length-- > 0) {
521                 positionByteBuffer++;
522                 byte b = array[pos++];
523                 if (b == '\r' || b == '\n') {
524                     if (!isNewLine) {
525                         printExistingLine();
526                         count = 0;
527                     }
528                     isNewLine = true;
529                 } else {
530                     buffer[count++] = b;
531                     isNewLine = false;
532                 }
533             }
534         }
535 
536         public void clear() {
537             count = 0;
538         }
539 
540         @Override
541         public String toString() {
542             return new String(buffer, 0, count, DEFAULT_STREAM_ENCODING);
543         }
544 
545         private boolean isEmpty() {
546             return count == 0;
547         }
548 
549         private void ensureCapacity(int addCapacity) {
550             int oldCapacity = buffer.length;
551             int exactCapacity = count + addCapacity;
552             if (exactCapacity < 0) {
553                 throw new OutOfMemoryError();
554             }
555 
556             if (oldCapacity < exactCapacity) {
557                 int newCapacity = oldCapacity << 1;
558                 buffer = copyOf(buffer, max(newCapacity, exactCapacity));
559             }
560         }
561 
562         void printExistingLine() {
563             if (isEmpty()) {
564                 return;
565             }
566 
567             String s = toString();
568             if (isBlank(s)) {
569                 return;
570             }
571 
572             if (s.contains(PRINTABLE_JVM_NATIVE_STREAM)) {
573                 if (logger.isDebugEnabled()) {
574                     logger.debug(s);
575                 } else if (logger.isInfoEnabled()) {
576                     logger.info(s);
577                 } else {
578                     // In case of debugging forked JVM, see PRINTABLE_JVM_NATIVE_STREAM.
579                     System.out.println(s);
580                 }
581             } else {
582                 if (isJvmError(s)) {
583                     logger.error(s);
584                 } else if (logger.isDebugEnabled()) {
585                     logger.debug(s);
586                 }
587 
588                 String msg = "Corrupted channel by directly writing to native stream in forked JVM "
589                         + arguments.getForkChannelId() + ".";
590                 File dumpFile = arguments.dumpStreamText(msg + " Stream '" + s + "'.");
591                 String dumpPath = dumpFile.getAbsolutePath();
592                 arguments.logWarningAtEnd(msg + " See FAQ web page and the dump file " + dumpPath);
593             }
594         }
595 
596         private boolean isJvmError(String line) {
597             String lineLower = line.toLowerCase();
598             for (String errorPattern : JVM_ERROR_PATTERNS) {
599                 if (lineLower.contains(errorPattern)) {
600                     return true;
601                 }
602             }
603             return false;
604         }
605     }
606 
607     public static final class MalformedFrameException extends Exception {
608         private final int readFrom;
609         private final int readTo;
610 
611         public MalformedFrameException(int readFrom, int readTo) {
612             this.readFrom = readFrom;
613             this.readTo = readTo;
614         }
615 
616         public int readFrom() {
617             return readFrom;
618         }
619 
620         public int readTo() {
621             return readTo;
622         }
623 
624         public boolean hasValidPositions() {
625             return readFrom != NO_POSITION && readTo != NO_POSITION && readTo - readFrom > 0;
626         }
627     }
628 
629     /**
630      * Underflow - could not completely read out al bytes in one call.
631      * <br>
632      * Overflow - read all bytes or more
633      * <br>
634      * EOF - end of stream.
635      */
636     public enum StreamReadStatus {
637         UNDERFLOW,
638         OVERFLOW,
639         EOF
640     }
641 }