View Javadoc
1   /*
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *   http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing,
13   * software distributed under the License is distributed on an
14   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   * KIND, either express or implied.  See the License for the
16   * specific language governing permissions and limitations
17   * under the License.
18   */
19  package org.apache.maven.surefire.api.stream;
20  
21  import javax.annotation.Nonnegative;
22  import javax.annotation.Nonnull;
23  
24  import java.io.EOFException;
25  import java.io.File;
26  import java.io.IOException;
27  import java.nio.Buffer;
28  import java.nio.ByteBuffer;
29  import java.nio.CharBuffer;
30  import java.nio.channels.ReadableByteChannel;
31  import java.nio.charset.Charset;
32  import java.nio.charset.CharsetDecoder;
33  import java.nio.charset.CoderResult;
34  import java.util.ArrayList;
35  import java.util.List;
36  import java.util.Map;
37  
38  import org.apache.maven.plugin.surefire.log.api.ConsoleLogger;
39  import org.apache.maven.surefire.api.fork.ForkNodeArguments;
40  
41  import static java.lang.Math.max;
42  import static java.lang.Math.min;
43  import static java.nio.charset.CodingErrorAction.REPLACE;
44  import static java.nio.charset.StandardCharsets.US_ASCII;
45  import static java.util.Arrays.copyOf;
46  import static org.apache.maven.surefire.api.booter.Constants.DEFAULT_STREAM_ENCODING;
47  import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.OVERFLOW;
48  import static org.apache.maven.surefire.api.stream.AbstractStreamDecoder.StreamReadStatus.UNDERFLOW;
49  import static org.apache.maven.surefire.shared.lang3.StringUtils.isBlank;
50  
51  /**
52   * @param <M> message object
53   * @param <MT> enum describing the meaning of the message
54   * @param <ST> enum for segment type
55   */
56  public abstract class AbstractStreamDecoder<M, MT extends Enum<MT>, ST extends Enum<ST>> implements AutoCloseable {
57      public static final int BUFFER_SIZE = 1024;
58  
59      private static final String PRINTABLE_JVM_NATIVE_STREAM = "Listening for transport dt_socket at address:";
60  
61      private static final String[] JVM_ERROR_PATTERNS = {
62          "could not create the java virtual machine",
63          "error occurred during initialization", // of VM, of boot layer
64          "error:", // general errors
65          "could not reserve enough space",
66          "could not allocate",
67          "unable to allocate", // memory errors
68          "java.lang.module.findexception" // JPMS errors
69      };
70  
71      private static final byte[] DEFAULT_STREAM_ENCODING_BYTES =
72              DEFAULT_STREAM_ENCODING.name().getBytes(US_ASCII);
73  
74      private static final int NO_POSITION = -1;
75      private static final int DELIMITER_LENGTH = 1;
76      private static final int BYTE_LENGTH = 1;
77      private static final int INT_LENGTH = 4;
78      private static final int LONG_LENGTH = 8;
79  
80      private final ReadableByteChannel channel;
81      private final ForkNodeArguments arguments;
82      private final Map<Segment, MT> messageTypes;
83      private final ConsoleLogger logger;
84  
85      protected AbstractStreamDecoder(
86              @Nonnull ReadableByteChannel channel,
87              @Nonnull ForkNodeArguments arguments,
88              @Nonnull Map<Segment, MT> messageTypes) {
89          this.channel = channel;
90          this.arguments = arguments;
91          this.messageTypes = messageTypes;
92          logger = arguments.getConsoleLogger();
93      }
94  
95      public abstract M decode(@Nonnull Memento memento) throws MalformedChannelException, IOException;
96  
97      @Nonnull
98      protected abstract byte[] getEncodedMagicNumber();
99  
100     @Nonnull
101     protected abstract ST[] nextSegmentType(@Nonnull MT messageType);
102 
103     @Nonnull
104     protected abstract M toMessage(@Nonnull MT messageType, @Nonnull Memento memento) throws MalformedFrameException;
105 
106     @Nonnull
107     protected final ForkNodeArguments getArguments() {
108         return arguments;
109     }
110 
111     protected void debugStream(byte[] array, int position, int remaining) {}
112 
113     protected MT readMessageType(@Nonnull Memento memento) throws IOException, MalformedFrameException {
114         byte[] header = getEncodedMagicNumber();
115         // Read the header one byte at a time so that non-protocol output (e.g. the JDWP
116         // "Listening for transport dt_socket at address: ..." line) is flushed to the console
117         // before the decoder blocks. That line contains ':' which looks like a frame start;
118         // checking each magic-number byte individually lets us bail out immediately on a
119         // mismatch instead of stalling on a 24-byte read that never completes.
120         read(memento, DELIMITER_LENGTH);
121         ByteBuffer bb = memento.getByteBuffer();
122         if ((bb.array()[bb.arrayOffset() + bb.position()] & 0xff) != ':') {
123             checkHeader(memento); // not ':', immediately throws MalformedFrameException
124         }
125         checkDelimiter(memento); // consume the ':'
126         for (int i = 0; i < header.length; i++) {
127             read(memento, DELIMITER_LENGTH);
128             bb = memento.getByteBuffer();
129             if ((bb.array()[bb.arrayOffset() + bb.position()] & 0xff) != (header[i] & 0xff)) {
130                 // Mismatch: report the ':' plus all bytes read so far in this frame attempt.
131                 throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), bb.position() + 1);
132             }
133             bb.position(bb.position() + 1);
134         }
135         read(memento, DELIMITER_LENGTH);
136         checkDelimiter(memento);
137         return messageTypes.get(readSegment(memento));
138     }
139 
140     @Nonnull
141     @SuppressWarnings("checkstyle:magicnumber")
142     protected Segment readSegment(@Nonnull Memento memento) throws IOException, MalformedFrameException {
143         int readCount = readByte(memento) & 0xff;
144         read(memento, readCount + DELIMITER_LENGTH);
145         ByteBuffer bb = memento.getByteBuffer();
146         Segment segment = new Segment(bb.array(), bb.arrayOffset() + ((Buffer) bb).position(), readCount);
147         ((Buffer) bb).position(((Buffer) bb).position() + readCount);
148         checkDelimiter(memento);
149         return segment;
150     }
151 
152     @Nonnull
153     @SuppressWarnings("checkstyle:magicnumber")
154     protected Charset readCharset(@Nonnull Memento memento) throws IOException, MalformedFrameException {
155         int length = readByte(memento) & 0xff;
156         read(memento, length + DELIMITER_LENGTH);
157         ByteBuffer bb = memento.getByteBuffer();
158         byte[] array = bb.array();
159         int offset = bb.arrayOffset() + ((Buffer) bb).position();
160         ((Buffer) bb).position(((Buffer) bb).position() + length);
161         boolean isDefaultEncoding = false;
162         if (length == DEFAULT_STREAM_ENCODING_BYTES.length) {
163             isDefaultEncoding = true;
164             for (int i = 0; i < length; i++) {
165                 isDefaultEncoding &= DEFAULT_STREAM_ENCODING_BYTES[i] == array[offset + i];
166             }
167         }
168 
169         try {
170             Charset charset = isDefaultEncoding
171                     ? DEFAULT_STREAM_ENCODING
172                     : Charset.forName(new String(array, offset, length, US_ASCII));
173 
174             checkDelimiter(memento);
175             return charset;
176         } catch (IllegalArgumentException e) {
177             throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), ((Buffer) bb).position());
178         }
179     }
180 
181     protected String readString(@Nonnull Memento memento) throws IOException, MalformedFrameException {
182         ((Buffer) memento.getCharBuffer()).clear();
183         int readCount = readInt(memento);
184         if (readCount < 0) {
185             throw new MalformedFrameException(
186                     memento.getLine().getPositionByteBuffer(), ((Buffer) memento.getByteBuffer()).position());
187         }
188         read(memento, readCount + DELIMITER_LENGTH);
189 
190         final String string;
191         if (readCount == 0) {
192             string = "";
193         } else if (readCount == 1) {
194             read(memento, 1);
195             byte oneChar = memento.getByteBuffer().get();
196             string = oneChar == 0 ? null : String.valueOf((char) oneChar);
197         } else {
198             string = readString(memento, readCount);
199         }
200         read(memento, 1);
201         checkDelimiter(memento);
202         return string;
203     }
204 
205     protected Integer readInteger(@Nonnull Memento memento) throws IOException, MalformedFrameException {
206         read(memento, BYTE_LENGTH);
207         boolean isNullObject = memento.getByteBuffer().get() == 0;
208         if (isNullObject) {
209             read(memento, DELIMITER_LENGTH);
210             checkDelimiter(memento);
211             return null;
212         }
213         return readInt(memento);
214     }
215 
216     protected byte readByte(@Nonnull Memento memento) throws IOException, MalformedFrameException {
217         read(memento, BYTE_LENGTH + DELIMITER_LENGTH);
218         byte b = memento.getByteBuffer().get();
219         checkDelimiter(memento);
220         return b;
221     }
222 
223     protected int readInt(@Nonnull Memento memento) throws IOException, MalformedFrameException {
224         read(memento, INT_LENGTH + DELIMITER_LENGTH);
225         int i = memento.getByteBuffer().getInt();
226         checkDelimiter(memento);
227         return i;
228     }
229 
230     protected Long readLong(@Nonnull Memento memento) throws IOException, MalformedFrameException {
231         read(memento, BYTE_LENGTH);
232         boolean isNullObject = memento.getByteBuffer().get() == 0;
233         if (isNullObject) {
234             read(memento, DELIMITER_LENGTH);
235             checkDelimiter(memento);
236             return null;
237         }
238         return readLongPrivate(memento);
239     }
240 
241     protected long readLongPrivate(@Nonnull Memento memento) throws IOException, MalformedFrameException {
242         read(memento, LONG_LENGTH + DELIMITER_LENGTH);
243         long num = memento.getByteBuffer().getLong();
244         checkDelimiter(memento);
245         return num;
246     }
247 
248     @SuppressWarnings("checkstyle:magicnumber")
249     protected final void checkDelimiter(Memento memento) throws MalformedFrameException {
250         ByteBuffer bb = memento.bb;
251         if ((0xff & bb.get()) != ':') {
252             throw new MalformedFrameException(memento.getLine().getPositionByteBuffer(), ((Buffer) bb).position());
253         }
254     }
255 
256     protected final void checkHeader(Memento memento) throws MalformedFrameException {
257         ByteBuffer bb = memento.bb;
258 
259         checkDelimiter(memento);
260 
261         int shift = 0;
262         try {
263             byte[] header = getEncodedMagicNumber();
264             byte[] bbArray = bb.array();
265             for (int start = bb.arrayOffset() + ((Buffer) bb).position(), length = header.length;
266                     shift < length;
267                     shift++) {
268                 if (bbArray[shift + start] != header[shift]) {
269                     throw new MalformedFrameException(
270                             memento.getLine().getPositionByteBuffer(), ((Buffer) bb).position() + shift);
271                 }
272             }
273         } finally {
274             ((Buffer) bb).position(((Buffer) bb).position() + shift);
275         }
276 
277         checkDelimiter(memento);
278     }
279 
280     protected void checkArguments(Memento memento, int expectedDataElements) throws MalformedFrameException {
281         if (memento.getData().size() != expectedDataElements) {
282             throw new MalformedFrameException(
283                     memento.getLine().getPositionByteBuffer(), ((Buffer) memento.getByteBuffer()).position());
284         }
285     }
286 
287     private String readString(@Nonnull final Memento memento, @Nonnegative final int totalBytes)
288             throws IOException, MalformedFrameException {
289         memento.getDecoder().reset();
290         final CharBuffer output = memento.getCharBuffer();
291         ((Buffer) output).clear();
292         final ByteBuffer input = memento.getByteBuffer();
293         final List<String> strings = new ArrayList<>();
294         int countDecodedBytes = 0;
295         for (boolean endOfInput = false; !endOfInput; ) {
296             final int bytesToRead = totalBytes - countDecodedBytes;
297             read(memento, bytesToRead);
298             int bytesToDecode = min(input.remaining(), bytesToRead);
299             final boolean isLastChunk = bytesToDecode == bytesToRead;
300             endOfInput = countDecodedBytes + bytesToDecode >= totalBytes;
301             do {
302                 boolean endOfChunk = output.remaining() >= bytesToRead;
303                 boolean endOfOutput = isLastChunk && endOfChunk;
304                 int readInputBytes = decodeString(
305                         memento.getDecoder(),
306                         input,
307                         output,
308                         bytesToDecode,
309                         endOfOutput,
310                         memento.getLine().getPositionByteBuffer());
311                 bytesToDecode -= readInputBytes;
312                 countDecodedBytes += readInputBytes;
313             } while (isLastChunk && bytesToDecode > 0 && output.hasRemaining());
314 
315             strings.add(((Buffer) output).flip().toString());
316             ((Buffer) output).clear();
317         }
318 
319         memento.getDecoder().reset();
320         ((Buffer) output).clear();
321 
322         return toString(strings);
323     }
324 
325     private static int decodeString(
326             @Nonnull CharsetDecoder decoder,
327             @Nonnull ByteBuffer input,
328             @Nonnull CharBuffer output,
329             @Nonnegative int bytesToDecode,
330             boolean endOfInput,
331             @Nonnegative int errorStreamFrom)
332             throws MalformedFrameException {
333         int limit = ((Buffer) input).limit();
334         ((Buffer) input).limit(((Buffer) input).position() + bytesToDecode);
335 
336         CoderResult result = decoder.decode(input, output, endOfInput);
337         if (result.isError() || result.isMalformed()) {
338             throw new MalformedFrameException(errorStreamFrom, ((Buffer) input).position());
339         }
340 
341         int decodedBytes = bytesToDecode - input.remaining();
342         ((Buffer) input).limit(limit);
343         return decodedBytes;
344     }
345 
346     private static String toString(List<String> strings) {
347         if (strings.size() == 1) {
348             return strings.get(0);
349         }
350         StringBuilder concatenated = new StringBuilder(strings.size() * BUFFER_SIZE);
351         for (String s : strings) {
352             concatenated.append(s);
353         }
354         return concatenated.toString();
355     }
356 
357     private void printCorruptedStream(Memento memento) {
358         ByteBuffer bb = memento.getByteBuffer();
359         if (bb.hasRemaining()) {
360             int bytesToWrite = bb.remaining();
361             memento.getLine().write(bb, ((Buffer) bb).position(), bytesToWrite);
362             ((Buffer) bb).position(((Buffer) bb).position() + bytesToWrite);
363         }
364     }
365 
366     /**
367      * Print the last string which has not been finished by a new line character.
368      *
369      * @param memento current memento object
370      */
371     protected final void printRemainingStream(Memento memento) {
372         printCorruptedStream(memento);
373         memento.getLine().printExistingLine();
374         memento.getLine().clear();
375     }
376 
377     public static final class Segment {
378         private final byte[] array;
379         private final int fromIndex;
380         private final int length;
381         private final int hashCode;
382 
383         public Segment(byte[] array, int fromIndex, int length) {
384             this.array = array;
385             this.fromIndex = fromIndex;
386             this.length = length;
387 
388             int hashCode = 0;
389             int i = fromIndex;
390             for (int loops = length >> 1; loops-- != 0; ) {
391                 hashCode = 31 * hashCode + array[i++];
392                 hashCode = 31 * hashCode + array[i++];
393             }
394             this.hashCode = i == fromIndex + length ? hashCode : 31 * hashCode + array[i];
395         }
396 
397         @Override
398         public int hashCode() {
399             return hashCode;
400         }
401 
402         @Override
403         public boolean equals(Object obj) {
404             if (!(obj instanceof Segment)) {
405                 return false;
406             }
407 
408             Segment that = (Segment) obj;
409             if (that.length != length) {
410                 return false;
411             }
412 
413             for (int i = 0; i < length; i++) {
414                 if (that.array[that.fromIndex + i] != array[fromIndex + i]) {
415                     return false;
416                 }
417             }
418             return true;
419         }
420     }
421 
422     protected @Nonnull StreamReadStatus read(@Nonnull Memento memento, int recommendedCount) throws IOException {
423         ByteBuffer buffer = memento.getByteBuffer();
424         if (buffer.remaining() >= recommendedCount && ((Buffer) buffer).limit() != 0) {
425             return OVERFLOW;
426         } else {
427             if (((Buffer) buffer).position() != 0
428                     && recommendedCount > buffer.capacity() - ((Buffer) buffer).position()) {
429                 ((Buffer) buffer.compact()).flip();
430                 memento.getLine().setPositionByteBuffer(0);
431             }
432             int mark = ((Buffer) buffer).position();
433             ((Buffer) buffer).position(((Buffer) buffer).limit());
434             ((Buffer) buffer).limit(min(((Buffer) buffer).position() + recommendedCount, buffer.capacity()));
435             return read(buffer, mark, recommendedCount);
436         }
437     }
438 
439     private StreamReadStatus read(ByteBuffer buffer, int oldPosition, int recommendedCount) throws IOException {
440         StreamReadStatus readStatus = null;
441         boolean isEnd = false;
442         try {
443             while (!isEnd
444                     && ((Buffer) buffer).position() - oldPosition < recommendedCount
445                     && ((Buffer) buffer).position() < ((Buffer) buffer).limit()) {
446                 isEnd = channel.read(buffer) == -1;
447             }
448         } finally {
449             ((Buffer) buffer).limit(((Buffer) buffer).position());
450             ((Buffer) buffer).position(oldPosition);
451             int readBytes = buffer.remaining();
452             boolean readComplete = readBytes >= recommendedCount;
453             if (!isEnd || readComplete) {
454                 debugStream(buffer.array(), buffer.arrayOffset() + ((Buffer) buffer).position(), buffer.remaining());
455                 readStatus = readComplete ? OVERFLOW : UNDERFLOW;
456             }
457         }
458 
459         if (readStatus == null) {
460             throw new EOFException();
461         } else {
462             return readStatus;
463         }
464     }
465 
466     public final class Memento {
467         private CharsetDecoder currentDecoder;
468         private final CharsetDecoder defaultDecoder;
469         private final BufferedStream line = new BufferedStream(32);
470         private final List<Object> data = new ArrayList<>();
471         private final CharBuffer cb = CharBuffer.allocate(BUFFER_SIZE);
472         private final ByteBuffer bb = ByteBuffer.allocate(BUFFER_SIZE);
473 
474         public Memento() {
475             defaultDecoder = DEFAULT_STREAM_ENCODING
476                     .newDecoder()
477                     .onMalformedInput(REPLACE)
478                     .onUnmappableCharacter(REPLACE);
479             ((Buffer) bb).limit(0);
480         }
481 
482         public void reset() {
483             currentDecoder = null;
484             data.clear();
485         }
486 
487         public CharsetDecoder getDecoder() {
488             return currentDecoder == null ? defaultDecoder : currentDecoder;
489         }
490 
491         public void setCharset(Charset charset) {
492             if (charset.name().equals(defaultDecoder.charset().name())) {
493                 currentDecoder = defaultDecoder;
494             } else {
495                 currentDecoder = charset.newDecoder().onMalformedInput(REPLACE).onUnmappableCharacter(REPLACE);
496             }
497         }
498 
499         public BufferedStream getLine() {
500             return line;
501         }
502 
503         public List<Object> getData() {
504             return data;
505         }
506 
507         public <T> T ofDataAt(int indexOfData) {
508             //noinspection unchecked
509             return (T) data.get(indexOfData);
510         }
511 
512         public CharBuffer getCharBuffer() {
513             return cb;
514         }
515 
516         public ByteBuffer getByteBuffer() {
517             return bb;
518         }
519     }
520 
521     /**
522      * This class avoids locking which gains the performance of this decoder.
523      */
524     public final class BufferedStream {
525         private byte[] buffer;
526         private int count;
527         private int positionByteBuffer;
528         private boolean isNewLine;
529 
530         BufferedStream(int capacity) {
531             this.buffer = new byte[capacity];
532         }
533 
534         public int getPositionByteBuffer() {
535             return positionByteBuffer;
536         }
537 
538         public void setPositionByteBuffer(int positionByteBuffer) {
539             this.positionByteBuffer = positionByteBuffer;
540         }
541 
542         public void write(ByteBuffer bb, int position, int length) {
543             ensureCapacity(length);
544             byte[] array = bb.array();
545             int pos = bb.arrayOffset() + position;
546             while (length-- > 0) {
547                 positionByteBuffer++;
548                 byte b = array[pos++];
549                 if (b == '\r' || b == '\n') {
550                     if (!isNewLine) {
551                         printExistingLine();
552                         count = 0;
553                     }
554                     isNewLine = true;
555                 } else {
556                     buffer[count++] = b;
557                     isNewLine = false;
558                 }
559             }
560         }
561 
562         public void clear() {
563             count = 0;
564         }
565 
566         @Override
567         public String toString() {
568             return new String(buffer, 0, count, DEFAULT_STREAM_ENCODING);
569         }
570 
571         private boolean isEmpty() {
572             return count == 0;
573         }
574 
575         private void ensureCapacity(int addCapacity) {
576             int oldCapacity = buffer.length;
577             int exactCapacity = count + addCapacity;
578             if (exactCapacity < 0) {
579                 throw new OutOfMemoryError();
580             }
581 
582             if (oldCapacity < exactCapacity) {
583                 int newCapacity = oldCapacity << 1;
584                 buffer = copyOf(buffer, max(newCapacity, exactCapacity));
585             }
586         }
587 
588         void printExistingLine() {
589             if (isEmpty()) {
590                 return;
591             }
592 
593             String s = toString();
594             if (isBlank(s)) {
595                 return;
596             }
597 
598             if (s.contains(PRINTABLE_JVM_NATIVE_STREAM)) {
599                 if (logger.isDebugEnabled()) {
600                     logger.debug(s);
601                 } else if (logger.isInfoEnabled()) {
602                     logger.info(s);
603                 } else {
604                     // In case of debugging forked JVM, see PRINTABLE_JVM_NATIVE_STREAM.
605                     System.out.println(s);
606                 }
607             } else {
608                 if (isJvmError(s)) {
609                     logger.error(s);
610                 } else if (logger.isDebugEnabled()) {
611                     logger.debug(s);
612                 }
613 
614                 String msg = "Corrupted channel by directly writing to native stream in forked JVM "
615                         + arguments.getForkChannelId() + ".";
616                 File dumpFile = arguments.dumpStreamText(msg + " Stream '" + s + "'.");
617                 String dumpPath = dumpFile.getAbsolutePath();
618                 arguments.logWarningAtEnd(msg + " See FAQ web page and the dump file " + dumpPath);
619             }
620         }
621 
622         private boolean isJvmError(String line) {
623             String lineLower = line.toLowerCase();
624             for (String errorPattern : JVM_ERROR_PATTERNS) {
625                 if (lineLower.contains(errorPattern)) {
626                     return true;
627                 }
628             }
629             return false;
630         }
631     }
632 
633     public static final class MalformedFrameException extends Exception {
634         private final int readFrom;
635         private final int readTo;
636 
637         public MalformedFrameException(int readFrom, int readTo) {
638             this.readFrom = readFrom;
639             this.readTo = readTo;
640         }
641 
642         public int readFrom() {
643             return readFrom;
644         }
645 
646         public int readTo() {
647             return readTo;
648         }
649 
650         public boolean hasValidPositions() {
651             return readFrom != NO_POSITION && readTo != NO_POSITION && readTo - readFrom > 0;
652         }
653     }
654 
655     /**
656      * Underflow - could not completely read out al bytes in one call.
657      * <br>
658      * Overflow - read all bytes or more
659      * <br>
660      * EOF - end of stream.
661      */
662     public enum StreamReadStatus {
663         UNDERFLOW,
664         OVERFLOW,
665         EOF
666     }
667 }