View Javadoc
1   package org.apache.maven.index.updater;
2   
3   /*
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *   http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing,
15   * software distributed under the License is distributed on an
16   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17   * KIND, either express or implied.  See the License for the
18   * specific language governing permissions and limitations
19   * under the License.
20   */
21  
22  import java.io.BufferedInputStream;
23  import java.io.DataInput;
24  import java.io.DataInputStream;
25  import java.io.EOFException;
26  import java.io.IOException;
27  import java.io.InputStream;
28  import java.io.UTFDataFormatException;
29  import java.nio.file.Files;
30  import java.time.Duration;
31  import java.time.Instant;
32  import java.util.ArrayList;
33  import java.util.Date;
34  import java.util.Set;
35  import java.util.concurrent.ArrayBlockingQueue;
36  import java.util.concurrent.ConcurrentHashMap;
37  import java.util.concurrent.ConcurrentMap;
38  import java.util.concurrent.ExecutorService;
39  import java.util.concurrent.Executors;
40  import java.util.concurrent.TimeUnit;
41  import java.util.zip.GZIPInputStream;
42  
43  import org.apache.lucene.document.Document;
44  import org.apache.lucene.document.Field;
45  import org.apache.lucene.document.FieldType;
46  import org.apache.lucene.index.IndexOptions;
47  import org.apache.lucene.index.IndexWriter;
48  import org.apache.lucene.index.IndexWriterConfig;
49  import org.apache.lucene.store.FSDirectory;
50  import org.apache.maven.index.ArtifactInfo;
51  import org.apache.maven.index.context.IndexUtils;
52  import org.apache.maven.index.context.IndexingContext;
53  import org.apache.maven.index.context.NexusAnalyzer;
54  import org.apache.maven.index.context.NexusIndexWriter;
55  import org.slf4j.Logger;
56  import org.slf4j.LoggerFactory;
57  
58  /**
59   * An index data reader used to parse transfer index format.
60   *
61   * @author Eugene Kuleshov
62   */
63  public class IndexDataReader
64  {
65      private static final Logger LOGGER = LoggerFactory.getLogger( IndexDataReader.class );
66  
67      private final DataInputStream dis;
68  
69      private final int threads;
70  
71      public IndexDataReader( final InputStream is )
72              throws IOException
73      {
74          this( is, 1 );
75      }
76  
77      public IndexDataReader( final InputStream is, final int threads )
78              throws IOException
79      {
80          if ( threads < 1 )
81          {
82              throw new IllegalArgumentException( "Reader threads must be greater than zero: " + threads );
83          }
84          this.threads = threads;
85          // MINDEXER-13
86          // LightweightHttpWagon may have performed automatic decompression
87          // Handle it transparently
88          is.mark( 2 );
89          InputStream data;
90          if ( is.read() == 0x1f && is.read() == 0x8b ) // GZIPInputStream.GZIP_MAGIC
91          {
92              is.reset();
93              data = new BufferedInputStream( new GZIPInputStream( is, 1024 * 8 ), 1024 * 8 );
94          }
95          else
96          {
97              is.reset();
98              data = new BufferedInputStream( is, 1024 * 8 );
99          }
100 
101         this.dis = new DataInputStream( data );
102     }
103 
104     public IndexDataReadResult readIndex( IndexWriter w, IndexingContext context )
105             throws IOException
106     {
107         if ( threads == 1 )
108         {
109             return readIndexST( w, context );
110         }
111         else
112         {
113             return readIndexMT( w, context );
114         }
115     }
116 
117     private IndexDataReadResult readIndexST( IndexWriter w, IndexingContext context )
118             throws IOException
119     {
120         LOGGER.debug( "Reading ST index..." );
121         Instant start = Instant.now();
122         long timestamp = readHeader();
123 
124         Date date = null;
125 
126         if ( timestamp != -1 )
127         {
128             date = new Date( timestamp );
129 
130             IndexUtils.updateTimestamp( w.getDirectory(), date );
131         }
132 
133         int n = 0;
134 
135         Document doc;
136         ConcurrentMap<String, Boolean> rootGroups = new ConcurrentHashMap<>();
137         ConcurrentMap<String, Boolean> allGroups = new ConcurrentHashMap<>();
138 
139         while ( ( doc = readDocument() ) != null )
140         {
141             addToIndex( doc, context, w, rootGroups, allGroups );
142             n++;
143         }
144 
145         w.commit();
146 
147         IndexDataReadResult result = new IndexDataReadResult();
148         result.setDocumentCount( n );
149         result.setTimestamp( date );
150         result.setRootGroups( rootGroups.keySet() );
151         result.setAllGroups( allGroups.keySet() );
152 
153         LOGGER.debug( "Reading ST index done in {} sec", Duration.between( start, Instant.now() ).getSeconds() );
154         return result;
155     }
156 
157     private IndexDataReadResult readIndexMT( IndexWriter w, IndexingContext context )
158             throws IOException
159     {
160         LOGGER.debug( "Reading MT index..." );
161         Instant start = Instant.now();
162         long timestamp = readHeader();
163 
164         int n = 0;
165 
166         final Document theEnd = new Document();
167 
168         ConcurrentMap<String, Boolean> rootGroups = new ConcurrentHashMap<>();
169         ConcurrentMap<String, Boolean> allGroups = new ConcurrentHashMap<>();
170         ArrayBlockingQueue<Document> queue = new ArrayBlockingQueue<>( 10000 );
171 
172         ExecutorService executorService = Executors.newFixedThreadPool( threads );
173         ArrayList<Exception> errors = new ArrayList<>();
174         ArrayList<IndexWriter> silos = new ArrayList<>( threads );
175         for ( int i = 0; i < threads; i++ )
176         {
177             final int silo = i;
178             silos.add( tempWriter( "silo" + i ) );
179             executorService.execute( () ->
180             {
181                 LOGGER.debug( "Starting thread {}", Thread.currentThread().getName() );
182                 try
183                 {
184                     while ( true )
185                     {
186                         try
187                         {
188                             Document doc = queue.take();
189                             if ( doc == theEnd )
190                             {
191                                 break;
192                             }
193                             addToIndex( doc, context, silos.get( silo ), rootGroups, allGroups );
194                         }
195                         catch ( InterruptedException | IOException e )
196                         {
197                             errors.add( e );
198                             break;
199                         }
200                     }
201                 }
202                 finally
203                 {
204                     LOGGER.debug( "Done thread {}", Thread.currentThread().getName() );
205                 }
206             } );
207         }
208 
209         try
210         {
211             Document doc;
212             while ( ( doc = readDocument() ) != null )
213             {
214                 queue.put( doc );
215                 n++;
216             }
217             LOGGER.debug( "Signalling END" );
218             for ( int i = 0; i < threads; i++ )
219             {
220                 queue.put( theEnd );
221             }
222 
223             LOGGER.debug( "Shutting down threads" );
224             executorService.shutdown();
225             executorService.awaitTermination( 5L, TimeUnit.MINUTES );
226         }
227         catch ( InterruptedException e )
228         {
229             throw new IOException( "Interrupted", e );
230         }
231 
232         if ( !errors.isEmpty() )
233         {
234             IOException exception = new IOException( "Error during load of index" );
235             errors.forEach( exception::addSuppressed );
236             throw exception;
237         }
238 
239         LOGGER.debug( "Silos loaded..." );
240         Date date = null;
241         if ( timestamp != -1 )
242         {
243             date = new Date( timestamp );
244             IndexUtils.updateTimestamp( w.getDirectory(), date );
245         }
246 
247         LOGGER.debug( "Merging silos..." );
248         for ( IndexWriter silo : silos )
249         {
250             IndexUtils.close( silo );
251             w.addIndexes( silo.getDirectory() );
252         }
253 
254         LOGGER.debug( "Merged silos..." );
255         w.commit();
256 
257         IndexDataReadResult result = new IndexDataReadResult();
258         result.setDocumentCount( n );
259         result.setTimestamp( date );
260         result.setRootGroups( rootGroups.keySet() );
261         result.setAllGroups( allGroups.keySet() );
262 
263         LOGGER.debug( "Reading MT index done in {} sec", Duration.between( start, Instant.now() ).getSeconds() );
264         return result;
265     }
266 
267     private FSDirectory tempDirectory( final String name ) throws IOException
268     {
269         return FSDirectory.open( Files.createTempDirectory( name + ".dir" ) );
270     }
271 
272     private IndexWriter tempWriter( final String name ) throws IOException
273     {
274         IndexWriterConfig config = new IndexWriterConfig( new NexusAnalyzer() );
275         config.setUseCompoundFile( false );
276         return new NexusIndexWriter( tempDirectory( name ), config );
277     }
278 
279     private void addToIndex( final Document doc, final IndexingContext context, final IndexWriter indexWriter,
280                              final ConcurrentMap<String, Boolean> rootGroups,
281                              final ConcurrentMap<String, Boolean> allGroups )
282             throws IOException
283     {
284         ArtifactInfo ai = IndexUtils.constructArtifactInfo( doc, context );
285         if ( ai != null )
286         {
287             indexWriter.addDocument( IndexUtils.updateDocument( doc, context, false, ai ) );
288 
289             rootGroups.putIfAbsent( ai.getRootGroup(), Boolean.TRUE );
290             allGroups.putIfAbsent( ai.getGroupId(), Boolean.TRUE );
291         }
292         else
293         {
294             // these two fields are automatically handled in code above
295             if ( doc.getField( ArtifactInfo.ALL_GROUPS ) == null
296                     && doc.getField( ArtifactInfo.ROOT_GROUPS ) == null )
297             {
298                 indexWriter.addDocument( doc );
299             }
300         }
301     }
302 
303     public long readHeader()
304             throws IOException
305     {
306         final byte hdrbyte = (byte) ( ( IndexDataWriter.VERSION << 24 ) >> 24 );
307 
308         if ( hdrbyte != dis.readByte() )
309         {
310             // data format version mismatch
311             throw new IOException( "Provided input contains unexpected data (0x01 expected as 1st byte)!" );
312         }
313 
314         return dis.readLong();
315     }
316 
317     public Document readDocument()
318             throws IOException
319     {
320         int fieldCount;
321         try
322         {
323             fieldCount = dis.readInt();
324         }
325         catch ( EOFException ex )
326         {
327             return null; // no more documents
328         }
329 
330         Document doc = new Document();
331 
332         for ( int i = 0; i < fieldCount; i++ )
333         {
334             doc.add( readField() );
335         }
336 
337         // Fix up UINFO field wrt MINDEXER-41
338         final Field uinfoField = (Field) doc.getField( ArtifactInfo.UINFO );
339         final String info = doc.get( ArtifactInfo.INFO );
340         if ( uinfoField != null && info != null && !info.isEmpty() )
341         {
342             final String[] splitInfo = ArtifactInfo.FS_PATTERN.split( info );
343             if ( splitInfo.length > 6 )
344             {
345                 final String extension = splitInfo[6];
346                 final String uinfoString = uinfoField.stringValue();
347                 if ( uinfoString.endsWith( ArtifactInfo.FS + ArtifactInfo.NA ) )
348                 {
349                     uinfoField.setStringValue( uinfoString + ArtifactInfo.FS + ArtifactInfo.nvl( extension ) );
350                 }
351             }
352         }
353 
354         return doc;
355     }
356 
357     private Field readField()
358             throws IOException
359     {
360         int flags = dis.read();
361 
362         FieldType fieldType = new FieldType();
363         if ( ( flags & IndexDataWriter.F_INDEXED ) > 0 )
364         {
365             boolean tokenized = ( flags & IndexDataWriter.F_TOKENIZED ) > 0;
366             fieldType.setTokenized( tokenized );
367         }
368         fieldType.setIndexOptions( IndexOptions.DOCS_AND_FREQS_AND_POSITIONS );
369         fieldType.setStored( ( flags & IndexDataWriter.F_STORED ) > 0 );
370 
371         String name = dis.readUTF();
372         String value = readUTF( dis );
373 
374         return new Field( name, value, fieldType );
375     }
376 
377     private static String readUTF( DataInput in )
378             throws IOException
379     {
380         int utflen = in.readInt();
381 
382         byte[] bytearr;
383         char[] chararr;
384 
385         try
386         {
387             bytearr = new byte[utflen];
388             chararr = new char[utflen];
389         }
390         catch ( OutOfMemoryError e )
391         {
392             throw new IOException( "Index data content is inappropriate (is junk?), leads to OutOfMemoryError!"
393                     + " See MINDEXER-28 for more information!", e );
394         }
395 
396         int c, char2, char3;
397         int count = 0;
398         int chararrCount = 0;
399 
400         in.readFully( bytearr, 0, utflen );
401 
402         while ( count < utflen )
403         {
404             c = bytearr[count] & 0xff;
405             if ( c > 127 )
406             {
407                 break;
408             }
409             count++;
410             chararr[chararrCount++] = (char) c;
411         }
412 
413         while ( count < utflen )
414         {
415             c = bytearr[count] & 0xff;
416             switch ( c >> 4 )
417             {
418                 case 0:
419                 case 1:
420                 case 2:
421                 case 3:
422                 case 4:
423                 case 5:
424                 case 6:
425                 case 7:
426                     /* 0xxxxxxx */
427                     count++;
428                     chararr[chararrCount++] = (char) c;
429                     break;
430 
431                 case 12:
432                 case 13:
433                     /* 110x xxxx 10xx xxxx */
434                     count += 2;
435                     if ( count > utflen )
436                     {
437                         throw new UTFDataFormatException( "malformed input: partial character at end" );
438                     }
439                     char2 = bytearr[count - 1];
440                     if ( ( char2 & 0xC0 ) != 0x80 )
441                     {
442                         throw new UTFDataFormatException( "malformed input around byte " + count );
443                     }
444                     chararr[chararrCount++] = (char) ( ( ( c & 0x1F ) << 6 ) | ( char2 & 0x3F ) );
445                     break;
446 
447                 case 14:
448                     /* 1110 xxxx 10xx xxxx 10xx xxxx */
449                     count += 3;
450                     if ( count > utflen )
451                     {
452                         throw new UTFDataFormatException( "malformed input: partial character at end" );
453                     }
454                     char2 = bytearr[count - 2];
455                     char3 = bytearr[count - 1];
456                     if ( ( ( char2 & 0xC0 ) != 0x80 ) || ( ( char3 & 0xC0 ) != 0x80 ) )
457                     {
458                         throw new UTFDataFormatException( "malformed input around byte " + ( count - 1 ) );
459                     }
460                     chararr[chararrCount++] =
461                             (char) ( ( ( c & 0x0F ) << 12 ) | ( ( char2 & 0x3F ) << 6 ) | ( ( char3 & 0x3F ) ) );
462                     break;
463 
464                 default:
465                     /* 10xx xxxx, 1111 xxxx */
466                     throw new UTFDataFormatException( "malformed input around byte " + count );
467             }
468         }
469 
470         // The number of chars produced may be less than utflen
471         return new String( chararr, 0, chararrCount );
472     }
473 
474     /**
475      * An index data read result holder
476      */
477     public static class IndexDataReadResult
478     {
479         private Date timestamp;
480 
481         private int documentCount;
482 
483         private Set<String> rootGroups;
484 
485         private Set<String> allGroups;
486 
487         public void setDocumentCount( int documentCount )
488         {
489             this.documentCount = documentCount;
490         }
491 
492         public int getDocumentCount()
493         {
494             return documentCount;
495         }
496 
497         public void setTimestamp( Date timestamp )
498         {
499             this.timestamp = timestamp;
500         }
501 
502         public Date getTimestamp()
503         {
504             return timestamp;
505         }
506 
507         public void setRootGroups( Set<String> rootGroups )
508         {
509             this.rootGroups = rootGroups;
510         }
511 
512         public Set<String> getRootGroups()
513         {
514             return rootGroups;
515         }
516 
517         public void setAllGroups( Set<String> allGroups )
518         {
519             this.allGroups = allGroups;
520         }
521 
522         public Set<String> getAllGroups()
523         {
524             return allGroups;
525         }
526 
527     }
528 
529     /**
530      * Reads index content by using a visitor. <br>
531      * The visitor is called for each read documents after it has been populated with Lucene fields.
532      *
533      * @param visitor an index data visitor
534      * @param context indexing context
535      * @return statistics about read data
536      * @throws IOException in case of an IO exception during index file access
537      */
538     public IndexDataReadResult readIndex( final IndexDataReadVisitor visitor, final IndexingContext context )
539             throws IOException
540     {
541         dis.readByte(); // data format version
542 
543         long timestamp = dis.readLong();
544 
545         Date date = null;
546 
547         if ( timestamp != -1 )
548         {
549             date = new Date( timestamp );
550         }
551 
552         int n = 0;
553 
554         Document doc;
555         while ( ( doc = readDocument() ) != null )
556         {
557             visitor.visitDocument( IndexUtils.updateDocument( doc, context, false ) );
558 
559             n++;
560         }
561 
562         IndexDataReadResult result = new IndexDataReadResult();
563         result.setDocumentCount( n );
564         result.setTimestamp( date );
565         return result;
566     }
567 
568     /**
569      * Visitor of indexed Lucene documents.
570      */
571     public interface IndexDataReadVisitor
572     {
573 
574         /**
575          * Called on each read document. The document is already populated with fields.
576          *
577          * @param document read document
578          */
579         void visitDocument( Document document );
580 
581     }
582 
583 }