View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.surefire.api.stream;
20  
21  import javax.annotation.Nonnegative;
22  import javax.annotation.Nonnull;
23  
24  import java.io.EOFException;
25  import java.io.File;
26  import java.io.IOException;
27  import java.nio.ByteBuffer;
28  import java.nio.CharBuffer;
29  import java.nio.channels.ReadableByteChannel;
30  import java.nio.charset.Charset;
31  import java.nio.charset.CharsetDecoder;
32  import java.nio.charset.CoderResult;
33  import java.util.ArrayList;
34  import java.util.List;
35  import java.util.Map;
36  
37  import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
38  import org.apache.maven.surefire.api.fork.ForkNodeArguments;
39  
40  import static java.lang.Math.max;
41  import static java.lang.Math.min;
42  import static java.nio.charset.CodingErrorAction.REPLACE;
43  import static java.nio.charset.StandardCharsets.US_ASCII;
44  import static java.util.Arrays.copyOf;
45  import static org.apache.maven.surefire.api.booter.Constants.DEFAULT_STREAM_ENCODING;
46  import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.OVERFLOW;
47  import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.UNDERFLOW;
48  import static org.apache.maven.surefire.shared.lang3.StringUtils.isBlank;
49  
50  /**
51   * @param <M> message object
52   * @param <MT> enum describing the meaning of the message
53   * @param <ST> enum for segment type
54   */
55  public abstract class AbstractStreamDecoder<M, MT extends Enum<MT>, ST extends Enum<ST>> implements AutoCloseable {
56      public static final int BUFFER_SIZE = 1024;
57  
58      private static final String PRINTABLE_JVM_NATIVE_STREAM = "Listening for transport dt_socket at address:";
59  
60      private static final String[] JVM_ERROR_PATTERNS = {
61          "could not create the java virtual machine",
62          "error occurred during initialization", // of VM, of boot layer
63          "error:", // general errors
64          "could not reserve enough space",
65          "could not allocate",
66          "unable to allocate", // memory errors
67          "java.lang.module.findexception" // JPMS errors
68      };
69  
70      private static final byte[] DEFAULT_STREAM_ENCODING_BYTES =
71              DEFAULT_STREAM_ENCODING.name().getBytes(US_ASCII);
72  
73      private static final int NO_POSITION = -1;
74      private static final int DELIMITER_LENGTH = 1;
75      private static final int BYTE_LENGTH = 1;
76      private static final int INT_LENGTH = 4;
77      private static final int LONG_LENGTH = 8;
78  
79      private final ReadableByteChannel channel;
80      private final ForkNodeArguments arguments;
81      private final Map<Segment, MT> messageTypes;
82      private final ConsoleLogger logger;
83  
84      protected AbstractStreamDecoder(
85              @Nonnull ReadableByteChannel channel,
86              @Nonnull ForkNodeArguments arguments,
87              @Nonnull Map<Segment, MT> messageTypes) {
88          this.channel = channel;
89          this.arguments = arguments;
90          this.messageTypes = messageTypes;
91          logger = arguments.getConsoleLogger();
92      }
93  
94      public abstract M decode(@Nonnull Memento memento) throws MalformedChannelException, IOException;
95  
96      @Nonnull
97      protected abstract byte[] getEncodedMagicNumber();
98  
99      @Nonnull
100     protected abstract ST[] nextSegmentType(@Nonnull MT messageType);
101 
102     @Nonnull
103     protected abstract M toMessage(@Nonnull MT messageType, @Nonnull Memento memento) throws MalformedFrameException;
104 
105     @Nonnull
106     protected final ForkNodeArguments getArguments() {
107         return arguments;
108     }
109 
110     protected void debugStream(byte[] array, int position, int remaining) {}
111 
112     protected MT readMessageType(@Nonnull Memento memento) throws IOException, MalformedFrameException {
113         byte[] header = getEncodedMagicNumber();
114         // Read the header one byte at a time so that non-protocol output (e.g. the JDWP
115         // "Listening for transport dt_socket at address: ..." line) is flushed to the console
116         // before the decoder blocks. That line contains ':' which looks like a frame start;
117         // checking each magic-number byte individually lets us bail out immediately on a
118         // mismatch instead of stalling on a 24-byte read that never completes.
119         read(memento, DELIMITER_LENGTH);
120         ByteBuffer bb = memento.getByteBuffer();
121         if ((bb.array()[bb.arrayOffset() + bb.position()] & 0xff) != ':') {
122             checkHeader(memento); // not ':', immediately throws MalformedFrameException
123         }
124         checkDelimiter(memento); // consume the ':'
125         for (int i = 0; i < header.length; i++) {
126             read(memento, DELIMITER_LENGTH);
127             bb = memento.getByteBuffer();
128             if ((bb.array()[bb.arrayOffset() + bb.position()] & 0xff) != (header[i] & 0xff)) {
129                 // Mismatch: report the ':' plus all bytes read so far in this frame attempt.
130                 throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), bb.position() + 1);
131             }
132             bb.position(bb.position() + 1);
133         }
134         read(memento, DELIMITER_LENGTH);
135         checkDelimiter(memento);
136         return messageTypes.get(readSegment(memento));
137     }
138 
139     @Nonnull
140     @SuppressWarnings("checkstyle:magicnumber")
141     protected Segment readSegment(@Nonnull Memento memento) throws IOException, MalformedFrameException {
142         int readCount = readByte(memento) & 0xff;
143         read(memento, readCount + DELIMITER_LENGTH);
144         ByteBuffer bb = memento.getByteBuffer();
145         Segment segment = new Segment(bb.array(), bb.arrayOffset() + bb.position(), readCount);
146         bb.position(bb.position() + readCount);
147         checkDelimiter(memento);
148         return segment;
149     }
150 
151     @Nonnull
152     @SuppressWarnings("checkstyle:magicnumber")
153     protected Charset readCharset(@Nonnull Memento memento) throws IOException, MalformedFrameException {
154         int length = readByte(memento) & 0xff;
155         read(memento, length + DELIMITER_LENGTH);
156         ByteBuffer bb = memento.getByteBuffer();
157         byte[] array = bb.array();
158         int offset = bb.arrayOffset() + bb.position();
159         bb.position(bb.position() + length);
160         boolean isDefaultEncoding = false;
161         if (length == DEFAULT_STREAM_ENCODING_BYTES.length) {
162             isDefaultEncoding = true;
163             for (int i = 0; i < length; i++) {
164                 isDefaultEncoding &= DEFAULT_STREAM_ENCODING_BYTES[i] == array[offset + i];
165             }
166         }
167 
168         try {
169             Charset charset = isDefaultEncoding
170                     ? DEFAULT_STREAM_ENCODING
171                     : Charset.forName(new String(array, offset, length, US_ASCII));
172 
173             checkDelimiter(memento);
174             return charset;
175         } catch (IllegalArgumentException e) {
176             throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), bb.position());
177         }
178     }
179 
180     protected String readString(@Nonnull Memento memento) throws IOException, MalformedFrameException {
181         (memento.getCharBuffer()).clear();
182         int readCount = readInt(memento);
183         if (readCount < 0) {
184             throw new MalformedFrameException(
185                     memento.getLine().getPositionByteBuffer(), (memento.getByteBuffer()).position());
186         }
187         read(memento, readCount + DELIMITER_LENGTH);
188 
189         final String string;
190         if (readCount == 0) {
191             string = "";
192         } else if (readCount == 1) {
193             read(memento, 1);
194             byte oneChar = memento.getByteBuffer().get();
195             string = oneChar == 0 ? null : String.valueOf((char) oneChar);
196         } else {
197             string = readString(memento, readCount);
198         }
199         read(memento, 1);
200         checkDelimiter(memento);
201         return string;
202     }
203 
204     protected Integer readInteger(@Nonnull Memento memento) throws IOException, MalformedFrameException {
205         read(memento, BYTE_LENGTH);
206         boolean isNullObject = memento.getByteBuffer().get() == 0;
207         if (isNullObject) {
208             read(memento, DELIMITER_LENGTH);
209             checkDelimiter(memento);
210             return null;
211         }
212         return readInt(memento);
213     }
214 
215     protected byte readByte(@Nonnull Memento memento) throws IOException, MalformedFrameException {
216         read(memento, BYTE_LENGTH + DELIMITER_LENGTH);
217         byte b = memento.getByteBuffer().get();
218         checkDelimiter(memento);
219         return b;
220     }
221 
222     protected int readInt(@Nonnull Memento memento) throws IOException, MalformedFrameException {
223         read(memento, INT_LENGTH + DELIMITER_LENGTH);
224         int i = memento.getByteBuffer().getInt();
225         checkDelimiter(memento);
226         return i;
227     }
228 
229     protected Long readLong(@Nonnull Memento memento) throws IOException, MalformedFrameException {
230         read(memento, BYTE_LENGTH);
231         boolean isNullObject = memento.getByteBuffer().get() == 0;
232         if (isNullObject) {
233             read(memento, DELIMITER_LENGTH);
234             checkDelimiter(memento);
235             return null;
236         }
237         return readLongPrivate(memento);
238     }
239 
240     protected long readLongPrivate(@Nonnull Memento memento) throws IOException, MalformedFrameException {
241         read(memento, LONG_LENGTH + DELIMITER_LENGTH);
242         long num = memento.getByteBuffer().getLong();
243         checkDelimiter(memento);
244         return num;
245     }
246 
247     @SuppressWarnings("checkstyle:magicnumber")
248     protected final void checkDelimiter(Memento memento) throws MalformedFrameException {
249         ByteBuffer bb = memento.bb;
250         if ((0xff & bb.get()) != ':') {
251             throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), bb.position());
252         }
253     }
254 
255     protected final void checkHeader(Memento memento) throws MalformedFrameException {
256         ByteBuffer bb = memento.bb;
257 
258         checkDelimiter(memento);
259 
260         int shift = 0;
261         try {
262             byte[] header = getEncodedMagicNumber();
263             byte[] bbArray = bb.array();
264             for (int start = bb.arrayOffset() + bb.position(), length = header.length; shift < length; shift++) {
265                 if (bbArray[shift + start] != header[shift]) {
266                     throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), bb.position() + shift);
267                 }
268             }
269         } finally {
270             bb.position(bb.position() + shift);
271         }
272 
273         checkDelimiter(memento);
274     }
275 
276     protected void checkArguments(Memento memento, int expectedDataElements) throws MalformedFrameException {
277         if (memento.getData().size() != expectedDataElements) {
278             throw new MalformedFrameException(
279                     memento.getLine().getPositionByteBuffer(), (memento.getByteBuffer()).position());
280         }
281     }
282 
283     private String readString(@Nonnull final Memento memento, @Nonnegative final int totalBytes)
284             throws IOException, MalformedFrameException {
285         memento.getDecoder().reset();
286         final CharBuffer output = memento.getCharBuffer();
287         (output).clear();
288         final ByteBuffer input = memento.getByteBuffer();
289         final List<String> strings = new ArrayList<>();
290         int countDecodedBytes = 0;
291         for (boolean endOfInput = false; !endOfInput; ) {
292             final int bytesToRead = totalBytes - countDecodedBytes;
293             read(memento, bytesToRead);
294             int bytesToDecode = min(input.remaining(), bytesToRead);
295             final boolean isLastChunk = bytesToDecode == bytesToRead;
296             endOfInput = countDecodedBytes + bytesToDecode >= totalBytes;
297             do {
298                 boolean endOfChunk = output.remaining() >= bytesToRead;
299                 boolean endOfOutput = isLastChunk && endOfChunk;
300                 int readInputBytes = decodeString(
301                         memento.getDecoder(),
302                         input,
303                         output,
304                         bytesToDecode,
305                         endOfOutput,
306                         memento.getLine().getPositionByteBuffer());
307                 bytesToDecode -= readInputBytes;
308                 countDecodedBytes += readInputBytes;
309             } while (isLastChunk && bytesToDecode > 0 && output.hasRemaining());
310 
311             strings.add((output).flip().toString());
312             (output).clear();
313         }
314 
315         memento.getDecoder().reset();
316         (output).clear();
317 
318         return toString(strings);
319     }
320 
321     private static int decodeString(
322             @Nonnull CharsetDecoder decoder,
323             @Nonnull ByteBuffer input,
324             @Nonnull CharBuffer output,
325             @Nonnegative int bytesToDecode,
326             boolean endOfInput,
327             @Nonnegative int errorStreamFrom)
328             throws MalformedFrameException {
329         int limit = (input).limit();
330         (input).limit((input).position() + bytesToDecode);
331 
332         CoderResult result = decoder.decode(input, output, endOfInput);
333         if (result.isError() || result.isMalformed()) {
334             throw new MalformedFrameException(errorStreamFrom, (input).position());
335         }
336 
337         int decodedBytes = bytesToDecode - input.remaining();
338         (input).limit(limit);
339         return decodedBytes;
340     }
341 
342     private static String toString(List<String> strings) {
343         if (strings.size() == 1) {
344             return strings.get(0);
345         }
346         StringBuilder concatenated = new StringBuilder(strings.size() * BUFFER_SIZE);
347         for (String s : strings) {
348             concatenated.append(s);
349         }
350         return concatenated.toString();
351     }
352 
353     private void printCorruptedStream(Memento memento) {
354         ByteBuffer bb = memento.getByteBuffer();
355         if (bb.hasRemaining()) {
356             int bytesToWrite = bb.remaining();
357             memento.getLine().write(bb, bb.position(), bytesToWrite);
358             bb.position(bb.position() + bytesToWrite);
359         }
360     }
361 
362     /**
363      * Print the last string which has not been finished by a new line character.
364      *
365      * @param memento current memento object
366      */
367     protected final void printRemainingStream(Memento memento) {
368         printCorruptedStream(memento);
369         memento.getLine().printExistingLine();
370         memento.getLine().clear();
371     }
372 
373     public static final class Segment {
374         private final byte[] array;
375         private final int fromIndex;
376         private final int length;
377         private final int hashCode;
378 
379         public Segment(byte[] array, int fromIndex, int length) {
380             this.array = array;
381             this.fromIndex = fromIndex;
382             this.length = length;
383 
384             int hashCode = 0;
385             int i = fromIndex;
386             for (int loops = length >> 1; loops-- != 0; ) {
387                 hashCode = 31 * hashCode + array[i++];
388                 hashCode = 31 * hashCode + array[i++];
389             }
390             this.hashCode = i == fromIndex + length ? hashCode : 31 * hashCode + array[i];
391         }
392 
393         @Override
394         public int hashCode() {
395             return hashCode;
396         }
397 
398         @Override
399         public boolean equals(Object obj) {
400             if (!(obj instanceof Segment)) {
401                 return false;
402             }
403 
404             Segment that = (Segment) obj;
405             if (that.length != length) {
406                 return false;
407             }
408 
409             for (int i = 0; i < length; i++) {
410                 if (that.array[that.fromIndex + i] != array[fromIndex + i]) {
411                     return false;
412                 }
413             }
414             return true;
415         }
416     }
417 
418     protected @Nonnull StreamReadStatus read(@Nonnull Memento memento, int recommendedCount) throws IOException {
419         ByteBuffer buffer = memento.getByteBuffer();
420         if (buffer.remaining() >= recommendedCount && buffer.limit() != 0) {
421             return OVERFLOW;
422         } else {
423             if (buffer.position() != 0 && recommendedCount > buffer.capacity() - buffer.position()) {
424                 (buffer.compact()).flip();
425                 memento.getLine().setPositionByteBuffer(0);
426             }
427             int mark = buffer.position();
428             buffer.position(buffer.limit());
429             buffer.limit(min(buffer.position() + recommendedCount, buffer.capacity()));
430             return read(buffer, mark, recommendedCount);
431         }
432     }
433 
434     private StreamReadStatus read(ByteBuffer buffer, int oldPosition, int recommendedCount) throws IOException {
435         StreamReadStatus readStatus = null;
436         boolean isEnd = false;
437         try {
438             while (!isEnd && buffer.position() - oldPosition < recommendedCount && buffer.position() < buffer.limit()) {
439                 isEnd = channel.read(buffer) == -1;
440             }
441         } finally {
442             buffer.limit(buffer.position());
443             buffer.position(oldPosition);
444             int readBytes = buffer.remaining();
445             boolean readComplete = readBytes >= recommendedCount;
446             if (!isEnd || readComplete) {
447                 debugStream(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
448                 readStatus = readComplete ? OVERFLOW : UNDERFLOW;
449             }
450         }
451 
452         if (readStatus == null) {
453             throw new EOFException();
454         } else {
455             return readStatus;
456         }
457     }
458 
459     public final class Memento {
460         private CharsetDecoder currentDecoder;
461         private final CharsetDecoder defaultDecoder;
462         private final BufferedStream line = new BufferedStream(32);
463         private final List<Object> data = new ArrayList<>();
464         private final CharBuffer cb = CharBuffer.allocate(BUFFER_SIZE);
465         private final ByteBuffer bb = ByteBuffer.allocate(BUFFER_SIZE);
466 
467         public Memento() {
468             defaultDecoder = DEFAULT_STREAM_ENCODING
469                     .newDecoder()
470                     .onMalformedInput(REPLACE)
471                     .onUnmappableCharacter(REPLACE);
472             bb.limit(0);
473         }
474 
475         public void reset() {
476             currentDecoder = null;
477             data.clear();
478         }
479 
480         public CharsetDecoder getDecoder() {
481             return currentDecoder == null ? defaultDecoder : currentDecoder;
482         }
483 
484         public void setCharset(Charset charset) {
485             if (charset.name().equals(defaultDecoder.charset().name())) {
486                 currentDecoder = defaultDecoder;
487             } else {
488                 currentDecoder = charset.newDecoder().onMalformedInput(REPLACE).onUnmappableCharacter(REPLACE);
489             }
490         }
491 
492         public BufferedStream getLine() {
493             return line;
494         }
495 
496         public List<Object> getData() {
497             return data;
498         }
499 
500         public <T> T ofDataAt(int indexOfData) {
501             //noinspection unchecked
502             return (T) data.get(indexOfData);
503         }
504 
505         public CharBuffer getCharBuffer() {
506             return cb;
507         }
508 
509         public ByteBuffer getByteBuffer() {
510             return bb;
511         }
512     }
513 
514     /**
515      * This class avoids locking which gains the performance of this decoder.
516      */
517     public final class BufferedStream {
518         private byte[] buffer;
519         private int count;
520         private int positionByteBuffer;
521         private boolean isNewLine;
522 
523         BufferedStream(int capacity) {
524             this.buffer = new byte[capacity];
525         }
526 
527         public int getPositionByteBuffer() {
528             return positionByteBuffer;
529         }
530 
531         public void setPositionByteBuffer(int positionByteBuffer) {
532             this.positionByteBuffer = positionByteBuffer;
533         }
534 
535         public void write(ByteBuffer bb, int position, int length) {
536             ensureCapacity(length);
537             byte[] array = bb.array();
538             int pos = bb.arrayOffset() + position;
539             while (length-- > 0) {
540                 positionByteBuffer++;
541                 byte b = array[pos++];
542                 if (b == '\r' || b == '\n') {
543                     if (!isNewLine) {
544                         printExistingLine();
545                         count = 0;
546                     }
547                     isNewLine = true;
548                 } else {
549                     buffer[count++] = b;
550                     isNewLine = false;
551                 }
552             }
553         }
554 
555         public void clear() {
556             count = 0;
557         }
558 
559         @Override
560         public String toString() {
561             return new String(buffer, 0, count, DEFAULT_STREAM_ENCODING);
562         }
563 
564         private boolean isEmpty() {
565             return count == 0;
566         }
567 
568         private void ensureCapacity(int addCapacity) {
569             int oldCapacity = buffer.length;
570             int exactCapacity = count + addCapacity;
571             if (exactCapacity < 0) {
572                 throw new OutOfMemoryError();
573             }
574 
575             if (oldCapacity < exactCapacity) {
576                 int newCapacity = oldCapacity << 1;
577                 buffer = copyOf(buffer, max(newCapacity, exactCapacity));
578             }
579         }
580 
581         void printExistingLine() {
582             if (isEmpty()) {
583                 return;
584             }
585 
586             String s = toString();
587             if (isBlank(s)) {
588                 return;
589             }
590 
591             if (s.contains(PRINTABLE_JVM_NATIVE_STREAM)) {
592                 if (logger.isDebugEnabled()) {
593                     logger.debug(s);
594                 } else if (logger.isInfoEnabled()) {
595                     logger.info(s);
596                 } else {
597                     // In case of debugging forked JVM, see PRINTABLE_JVM_NATIVE_STREAM.
598                     System.out.println(s);
599                 }
600             } else {
601                 if (isJvmError(s)) {
602                     logger.error(s);
603                 } else if (logger.isDebugEnabled()) {
604                     logger.debug(s);
605                 }
606 
607                 String msg = "Corrupted channel by directly writing to native stream in forked JVM "
608                         + arguments.getForkChannelId() + ".";
609                 File dumpFile = arguments.dumpStreamText(msg + " Stream '" + s + "'.");
610                 String dumpPath = dumpFile.getAbsolutePath();
611                 arguments.logWarningAtEnd(msg + " See FAQ web page and the dump file " + dumpPath);
612             }
613         }
614 
615         private boolean isJvmError(String line) {
616             String lineLower = line.toLowerCase();
617             for (String errorPattern : JVM_ERROR_PATTERNS) {
618                 if (lineLower.contains(errorPattern)) {
619                     return true;
620                 }
621             }
622             return false;
623         }
624     }
625 
626     public static final class MalformedFrameException extends Exception {
627         private final int readFrom;
628         private final int readTo;
629 
630         public MalformedFrameException(int readFrom, int readTo) {
631             this.readFrom = readFrom;
632             this.readTo = readTo;
633         }
634 
635         public int readFrom() {
636             return readFrom;
637         }
638 
639         public int readTo() {
640             return readTo;
641         }
642 
643         public boolean hasValidPositions() {
644             return readFrom != NO_POSITION && readTo != NO_POSITION && readTo - readFrom > 0;
645         }
646     }
647 
648     /**
649      * Underflow - could not completely read out al bytes in one call.
650      * <br>
651      * Overflow - read all bytes or more
652      * <br>
653      * EOF - end of stream.
654      */
655     public enum StreamReadStatus {
656         UNDERFLOW,
657         OVERFLOW,
658         EOF
659     }
660 }