View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.surefire.api.stream;
20  
21  import javax.annotation.Nonnegative;
22  import javax.annotation.Nonnull;
23  
24  import java.io.EOFException;
25  import java.io.File;
26  import java.io.IOException;
27  import java.nio.ByteBuffer;
28  import java.nio.CharBuffer;
29  import java.nio.channels.ReadableByteChannel;
30  import java.nio.charset.Charset;
31  import java.nio.charset.CharsetDecoder;
32  import java.nio.charset.CoderResult;
33  import java.util.ArrayList;
34  import java.util.List;
35  import java.util.Map;
36  
37  import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
38  import org.apache.maven.surefire.api.fork.ForkNodeArguments;
39  
40  import static java.lang.Math.max;
41  import static java.lang.Math.min;
42  import static java.nio.charset.CodingErrorAction.REPLACE;
43  import static java.nio.charset.StandardCharsets.US_ASCII;
44  import static java.util.Arrays.copyOf;
45  import static org.apache.maven.surefire.api.booter.Constants.DEFAULT_STREAM_ENCODING;
46  import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.OVERFLOW;
47  import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.UNDERFLOW;
48  import static org.apache.maven.surefire.shared.lang3.StringUtils.isBlank;
49  
50  /**
51   * @param <M> message object
52   * @param <MT> enum describing the meaning of the message
53   * @param <ST> enum for segment type
54   */
55  public abstract class AbstractStreamDecoder<M, MT extends Enum<MT>, ST extends Enum<ST>> implements AutoCloseable {
56      public static final int BUFFER_SIZE = 1024;
57  
58      private static final String PRINTABLE_JVM_NATIVE_STREAM = "Listening for transport dt_socket at address:";
59  
60      private static final String[] JVM_ERROR_PATTERNS = {
61          "could not create the java virtual machine",
62          "error occurred during initialization", // of VM, of boot layer
63          "error:", // general errors
64          "could not reserve enough space",
65          "could not allocate",
66          "unable to allocate", // memory errors
67          "java.lang.module.findexception" // JPMS errors
68      };
69  
70      private static final byte[] DEFAULT_STREAM_ENCODING_BYTES =
71              DEFAULT_STREAM_ENCODING.name().getBytes(US_ASCII);
72  
73      private static final int NO_POSITION = -1;
74      private static final int DELIMITER_LENGTH = 1;
75      private static final int BYTE_LENGTH = 1;
76      private static final int INT_LENGTH = 4;
77      private static final int LONG_LENGTH = 8;
78  
79      private final ReadableByteChannel channel;
80      private final ForkNodeArguments arguments;
81      private final Map<Segment, MT> messageTypes;
82      private final ConsoleLogger logger;
83  
84      protected AbstractStreamDecoder(
85              @Nonnull ReadableByteChannel channel,
86              @Nonnull ForkNodeArguments arguments,
87              @Nonnull Map<Segment, MT> messageTypes) {
88          this.channel = channel;
89          this.arguments = arguments;
90          this.messageTypes = messageTypes;
91          logger = arguments.getConsoleLogger();
92      }
93  
94      public abstract M decode(@Nonnull Memento memento) throws MalformedChannelException, IOException;
95  
96      @Nonnull
97      protected abstract byte[] getEncodedMagicNumber();
98  
99      @Nonnull
100     protected abstract ST[] nextSegmentType(@Nonnull MT messageType);
101 
102     @Nonnull
103     protected abstract M toMessage(@Nonnull MT messageType, @Nonnull Memento memento) throws MalformedFrameException;
104 
105     @Nonnull
106     protected final ForkNodeArguments getArguments() {
107         return arguments;
108     }
109 
110     protected void debugStream(byte[] array, int position, int remaining) {}
111 
112     protected MT readMessageType(@Nonnull Memento memento) throws IOException, MalformedFrameException {
113         byte[] header = getEncodedMagicNumber();
114         int readCount = DELIMITER_LENGTH + header.length + DELIMITER_LENGTH + BYTE_LENGTH + DELIMITER_LENGTH;
115         read(memento, readCount);
116         checkHeader(memento);
117         return messageTypes.get(readSegment(memento));
118     }
119 
120     @Nonnull
121     @SuppressWarnings("checkstyle:magicnumber")
122     protected Segment readSegment(@Nonnull Memento memento) throws IOException, MalformedFrameException {
123         int readCount = readByte(memento) & 0xff;
124         read(memento, readCount + DELIMITER_LENGTH);
125         ByteBuffer bb = memento.getByteBuffer();
126         Segment segment = new Segment(bb.array(), bb.arrayOffset() + bb.position(), readCount);
127         bb.position(bb.position() + readCount);
128         checkDelimiter(memento);
129         return segment;
130     }
131 
132     @Nonnull
133     @SuppressWarnings("checkstyle:magicnumber")
134     protected Charset readCharset(@Nonnull Memento memento) throws IOException, MalformedFrameException {
135         int length = readByte(memento) & 0xff;
136         read(memento, length + DELIMITER_LENGTH);
137         ByteBuffer bb = memento.getByteBuffer();
138         byte[] array = bb.array();
139         int offset = bb.arrayOffset() + bb.position();
140         bb.position(bb.position() + length);
141         boolean isDefaultEncoding = false;
142         if (length == DEFAULT_STREAM_ENCODING_BYTES.length) {
143             isDefaultEncoding = true;
144             for (int i = 0; i < length; i++) {
145                 isDefaultEncoding &= DEFAULT_STREAM_ENCODING_BYTES[i] == array[offset + i];
146             }
147         }
148 
149         try {
150             Charset charset = isDefaultEncoding
151                     ? DEFAULT_STREAM_ENCODING
152                     : Charset.forName(new String(array, offset, length, US_ASCII));
153 
154             checkDelimiter(memento);
155             return charset;
156         } catch (IllegalArgumentException e) {
157             throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), bb.position());
158         }
159     }
160 
161     protected String readString(@Nonnull Memento memento) throws IOException, MalformedFrameException {
162         (memento.getCharBuffer()).clear();
163         int readCount = readInt(memento);
164         if (readCount < 0) {
165             throw new MalformedFrameException(
166                     memento.getLine().getPositionByteBuffer(), (memento.getByteBuffer()).position());
167         }
168         read(memento, readCount + DELIMITER_LENGTH);
169 
170         final String string;
171         if (readCount == 0) {
172             string = "";
173         } else if (readCount == 1) {
174             read(memento, 1);
175             byte oneChar = memento.getByteBuffer().get();
176             string = oneChar == 0 ? null : String.valueOf((char) oneChar);
177         } else {
178             string = readString(memento, readCount);
179         }
180         read(memento, 1);
181         checkDelimiter(memento);
182         return string;
183     }
184 
185     protected Integer readInteger(@Nonnull Memento memento) throws IOException, MalformedFrameException {
186         read(memento, BYTE_LENGTH);
187         boolean isNullObject = memento.getByteBuffer().get() == 0;
188         if (isNullObject) {
189             read(memento, DELIMITER_LENGTH);
190             checkDelimiter(memento);
191             return null;
192         }
193         return readInt(memento);
194     }
195 
196     protected byte readByte(@Nonnull Memento memento) throws IOException, MalformedFrameException {
197         read(memento, BYTE_LENGTH + DELIMITER_LENGTH);
198         byte b = memento.getByteBuffer().get();
199         checkDelimiter(memento);
200         return b;
201     }
202 
203     protected int readInt(@Nonnull Memento memento) throws IOException, MalformedFrameException {
204         read(memento, INT_LENGTH + DELIMITER_LENGTH);
205         int i = memento.getByteBuffer().getInt();
206         checkDelimiter(memento);
207         return i;
208     }
209 
210     protected Long readLong(@Nonnull Memento memento) throws IOException, MalformedFrameException {
211         read(memento, BYTE_LENGTH);
212         boolean isNullObject = memento.getByteBuffer().get() == 0;
213         if (isNullObject) {
214             read(memento, DELIMITER_LENGTH);
215             checkDelimiter(memento);
216             return null;
217         }
218         return readLongPrivate(memento);
219     }
220 
221     protected long readLongPrivate(@Nonnull Memento memento) throws IOException, MalformedFrameException {
222         read(memento, LONG_LENGTH + DELIMITER_LENGTH);
223         long num = memento.getByteBuffer().getLong();
224         checkDelimiter(memento);
225         return num;
226     }
227 
228     @SuppressWarnings("checkstyle:magicnumber")
229     protected final void checkDelimiter(Memento memento) throws MalformedFrameException {
230         ByteBuffer bb = memento.bb;
231         if ((0xff & bb.get()) != ':') {
232             throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), bb.position());
233         }
234     }
235 
236     protected final void checkHeader(Memento memento) throws MalformedFrameException {
237         ByteBuffer bb = memento.bb;
238 
239         checkDelimiter(memento);
240 
241         int shift = 0;
242         try {
243             byte[] header = getEncodedMagicNumber();
244             byte[] bbArray = bb.array();
245             for (int start = bb.arrayOffset() + bb.position(), length = header.length; shift < length; shift++) {
246                 if (bbArray[shift + start] != header[shift]) {
247                     throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), bb.position() + shift);
248                 }
249             }
250         } finally {
251             bb.position(bb.position() + shift);
252         }
253 
254         checkDelimiter(memento);
255     }
256 
257     protected void checkArguments(Memento memento, int expectedDataElements) throws MalformedFrameException {
258         if (memento.getData().size() != expectedDataElements) {
259             throw new MalformedFrameException(
260                     memento.getLine().getPositionByteBuffer(), (memento.getByteBuffer()).position());
261         }
262     }
263 
264     private String readString(@Nonnull final Memento memento, @Nonnegative final int totalBytes)
265             throws IOException, MalformedFrameException {
266         memento.getDecoder().reset();
267         final CharBuffer output = memento.getCharBuffer();
268         (output).clear();
269         final ByteBuffer input = memento.getByteBuffer();
270         final List<String> strings = new ArrayList<>();
271         int countDecodedBytes = 0;
272         for (boolean endOfInput = false; !endOfInput; ) {
273             final int bytesToRead = totalBytes - countDecodedBytes;
274             read(memento, bytesToRead);
275             int bytesToDecode = min(input.remaining(), bytesToRead);
276             final boolean isLastChunk = bytesToDecode == bytesToRead;
277             endOfInput = countDecodedBytes + bytesToDecode >= totalBytes;
278             do {
279                 boolean endOfChunk = output.remaining() >= bytesToRead;
280                 boolean endOfOutput = isLastChunk && endOfChunk;
281                 int readInputBytes = decodeString(
282                         memento.getDecoder(),
283                         input,
284                         output,
285                         bytesToDecode,
286                         endOfOutput,
287                         memento.getLine().getPositionByteBuffer());
288                 bytesToDecode -= readInputBytes;
289                 countDecodedBytes += readInputBytes;
290             } while (isLastChunk && bytesToDecode > 0 && output.hasRemaining());
291 
292             strings.add((output).flip().toString());
293             (output).clear();
294         }
295 
296         memento.getDecoder().reset();
297         (output).clear();
298 
299         return toString(strings);
300     }
301 
302     private static int decodeString(
303             @Nonnull CharsetDecoder decoder,
304             @Nonnull ByteBuffer input,
305             @Nonnull CharBuffer output,
306             @Nonnegative int bytesToDecode,
307             boolean endOfInput,
308             @Nonnegative int errorStreamFrom)
309             throws MalformedFrameException {
310         int limit = (input).limit();
311         (input).limit((input).position() + bytesToDecode);
312 
313         CoderResult result = decoder.decode(input, output, endOfInput);
314         if (result.isError() || result.isMalformed()) {
315             throw new MalformedFrameException(errorStreamFrom, (input).position());
316         }
317 
318         int decodedBytes = bytesToDecode - input.remaining();
319         (input).limit(limit);
320         return decodedBytes;
321     }
322 
323     private static String toString(List<String> strings) {
324         if (strings.size() == 1) {
325             return strings.get(0);
326         }
327         StringBuilder concatenated = new StringBuilder(strings.size() * BUFFER_SIZE);
328         for (String s : strings) {
329             concatenated.append(s);
330         }
331         return concatenated.toString();
332     }
333 
334     private void printCorruptedStream(Memento memento) {
335         ByteBuffer bb = memento.getByteBuffer();
336         if (bb.hasRemaining()) {
337             int bytesToWrite = bb.remaining();
338             memento.getLine().write(bb, bb.position(), bytesToWrite);
339             bb.position(bb.position() + bytesToWrite);
340         }
341     }
342 
343     /**
344      * Print the last string which has not been finished by a new line character.
345      *
346      * @param memento current memento object
347      */
348     protected final void printRemainingStream(Memento memento) {
349         printCorruptedStream(memento);
350         memento.getLine().printExistingLine();
351         memento.getLine().clear();
352     }
353 
354     /**
355      *
356      */
357     public static final class Segment {
358         private final byte[] array;
359         private final int fromIndex;
360         private final int length;
361         private final int hashCode;
362 
363         public Segment(byte[] array, int fromIndex, int length) {
364             this.array = array;
365             this.fromIndex = fromIndex;
366             this.length = length;
367 
368             int hashCode = 0;
369             int i = fromIndex;
370             for (int loops = length >> 1; loops-- != 0; ) {
371                 hashCode = 31 * hashCode + array[i++];
372                 hashCode = 31 * hashCode + array[i++];
373             }
374             this.hashCode = i == fromIndex + length ? hashCode : 31 * hashCode + array[i];
375         }
376 
377         @Override
378         public int hashCode() {
379             return hashCode;
380         }
381 
382         @Override
383         public boolean equals(Object obj) {
384             if (!(obj instanceof Segment)) {
385                 return false;
386             }
387 
388             Segment that = (Segment) obj;
389             if (that.length != length) {
390                 return false;
391             }
392 
393             for (int i = 0; i < length; i++) {
394                 if (that.array[that.fromIndex + i] != array[fromIndex + i]) {
395                     return false;
396                 }
397             }
398             return true;
399         }
400     }
401 
402     protected @Nonnull StreamReadStatus read(@Nonnull Memento memento, int recommendedCount) throws IOException {
403         ByteBuffer buffer = memento.getByteBuffer();
404         if (buffer.remaining() >= recommendedCount && buffer.limit() != 0) {
405             return OVERFLOW;
406         } else {
407             if (buffer.position() != 0 && recommendedCount > buffer.capacity() - buffer.position()) {
408                 (buffer.compact()).flip();
409                 memento.getLine().setPositionByteBuffer(0);
410             }
411             int mark = buffer.position();
412             buffer.position(buffer.limit());
413             buffer.limit(min(buffer.position() + recommendedCount, buffer.capacity()));
414             return read(buffer, mark, recommendedCount);
415         }
416     }
417 
418     private StreamReadStatus read(ByteBuffer buffer, int oldPosition, int recommendedCount) throws IOException {
419         StreamReadStatus readStatus = null;
420         boolean isEnd = false;
421         try {
422             while (!isEnd && buffer.position() - oldPosition < recommendedCount && buffer.position() < buffer.limit()) {
423                 isEnd = channel.read(buffer) == -1;
424             }
425         } finally {
426             buffer.limit(buffer.position());
427             buffer.position(oldPosition);
428             int readBytes = buffer.remaining();
429             boolean readComplete = readBytes >= recommendedCount;
430             if (!isEnd || readComplete) {
431                 debugStream(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
432                 readStatus = readComplete ? OVERFLOW : UNDERFLOW;
433             }
434         }
435 
436         if (readStatus == null) {
437             throw new EOFException();
438         } else {
439             return readStatus;
440         }
441     }
442 
443     /**
444      *
445      */
446     public final class Memento {
447         private CharsetDecoder currentDecoder;
448         private final CharsetDecoder defaultDecoder;
449         private final BufferedStream line = new BufferedStream(32);
450         private final List<Object> data = new ArrayList<>();
451         private final CharBuffer cb = CharBuffer.allocate(BUFFER_SIZE);
452         private final ByteBuffer bb = ByteBuffer.allocate(BUFFER_SIZE);
453 
454         public Memento() {
455             defaultDecoder = DEFAULT_STREAM_ENCODING
456                     .newDecoder()
457                     .onMalformedInput(REPLACE)
458                     .onUnmappableCharacter(REPLACE);
459             bb.limit(0);
460         }
461 
462         public void reset() {
463             currentDecoder = null;
464             data.clear();
465         }
466 
467         public CharsetDecoder getDecoder() {
468             return currentDecoder == null ? defaultDecoder : currentDecoder;
469         }
470 
471         public void setCharset(Charset charset) {
472             if (charset.name().equals(defaultDecoder.charset().name())) {
473                 currentDecoder = defaultDecoder;
474             } else {
475                 currentDecoder = charset.newDecoder().onMalformedInput(REPLACE).onUnmappableCharacter(REPLACE);
476             }
477         }
478 
479         public BufferedStream getLine() {
480             return line;
481         }
482 
483         public List<Object> getData() {
484             return data;
485         }
486 
487         public <T> T ofDataAt(int indexOfData) {
488             //noinspection unchecked
489             return (T) data.get(indexOfData);
490         }
491 
492         public CharBuffer getCharBuffer() {
493             return cb;
494         }
495 
496         public ByteBuffer getByteBuffer() {
497             return bb;
498         }
499     }
500 
501     /**
502      * This class avoids locking which gains the performance of this decoder.
503      */
504     public final class BufferedStream {
505         private byte[] buffer;
506         private int count;
507         private int positionByteBuffer;
508         private boolean isNewLine;
509 
510         BufferedStream(int capacity) {
511             this.buffer = new byte[capacity];
512         }
513 
514         public int getPositionByteBuffer() {
515             return positionByteBuffer;
516         }
517 
518         public void setPositionByteBuffer(int positionByteBuffer) {
519             this.positionByteBuffer = positionByteBuffer;
520         }
521 
522         public void write(ByteBuffer bb, int position, int length) {
523             ensureCapacity(length);
524             byte[] array = bb.array();
525             int pos = bb.arrayOffset() + position;
526             while (length-- > 0) {
527                 positionByteBuffer++;
528                 byte b = array[pos++];
529                 if (b == '\r' || b == '\n') {
530                     if (!isNewLine) {
531                         printExistingLine();
532                         count = 0;
533                     }
534                     isNewLine = true;
535                 } else {
536                     buffer[count++] = b;
537                     isNewLine = false;
538                 }
539             }
540         }
541 
542         public void clear() {
543             count = 0;
544         }
545 
546         @Override
547         public String toString() {
548             return new String(buffer, 0, count, DEFAULT_STREAM_ENCODING);
549         }
550 
551         private boolean isEmpty() {
552             return count == 0;
553         }
554 
555         private void ensureCapacity(int addCapacity) {
556             int oldCapacity = buffer.length;
557             int exactCapacity = count + addCapacity;
558             if (exactCapacity < 0) {
559                 throw new OutOfMemoryError();
560             }
561 
562             if (oldCapacity < exactCapacity) {
563                 int newCapacity = oldCapacity << 1;
564                 buffer = copyOf(buffer, max(newCapacity, exactCapacity));
565             }
566         }
567 
568         void printExistingLine() {
569             if (isEmpty()) {
570                 return;
571             }
572 
573             String s = toString();
574             if (isBlank(s)) {
575                 return;
576             }
577 
578             if (s.contains(PRINTABLE_JVM_NATIVE_STREAM)) {
579                 if (logger.isDebugEnabled()) {
580                     logger.debug(s);
581                 } else if (logger.isInfoEnabled()) {
582                     logger.info(s);
583                 } else {
584                     // In case of debugging forked JVM, see PRINTABLE_JVM_NATIVE_STREAM.
585                     System.out.println(s);
586                 }
587             } else {
588                 if (isJvmError(s)) {
589                     logger.error(s);
590                 } else if (logger.isDebugEnabled()) {
591                     logger.debug(s);
592                 }
593 
594                 String msg = "Corrupted channel by directly writing to native stream in forked JVM "
595                         + arguments.getForkChannelId() + ".";
596                 File dumpFile = arguments.dumpStreamText(msg + " Stream '" + s + "'.");
597                 String dumpPath = dumpFile.getAbsolutePath();
598                 arguments.logWarningAtEnd(msg + " See FAQ web page and the dump file " + dumpPath);
599             }
600         }
601 
602         private boolean isJvmError(String line) {
603             String lineLower = line.toLowerCase();
604             for (String errorPattern : JVM_ERROR_PATTERNS) {
605                 if (lineLower.contains(errorPattern)) {
606                     return true;
607                 }
608             }
609             return false;
610         }
611     }
612 
613     /**
614      *
615      */
616     public static final class MalformedFrameException extends Exception {
617         private final int readFrom;
618         private final int readTo;
619 
620         public MalformedFrameException(int readFrom, int readTo) {
621             this.readFrom = readFrom;
622             this.readTo = readTo;
623         }
624 
625         public int readFrom() {
626             return readFrom;
627         }
628 
629         public int readTo() {
630             return readTo;
631         }
632 
633         public boolean hasValidPositions() {
634             return readFrom != NO_POSITION && readTo != NO_POSITION && readTo - readFrom > 0;
635         }
636     }
637 
638     /**
639      * Underflow - could not completely read out al bytes in one call.
640      * <br>
641      * Overflow - read all bytes or more
642      * <br>
643      * EOF - end of stream
644      */
645     public enum StreamReadStatus {
646         UNDERFLOW,
647         OVERFLOW,
648         EOF
649     }
650 }