1 package org.apache.maven.index.updater;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import java.io.BufferedInputStream;
23 import java.io.DataInput;
24 import java.io.DataInputStream;
25 import java.io.EOFException;
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.io.UTFDataFormatException;
29 import java.nio.file.Files;
30 import java.time.Duration;
31 import java.time.Instant;
32 import java.util.ArrayList;
33 import java.util.Date;
34 import java.util.Set;
35 import java.util.concurrent.ArrayBlockingQueue;
36 import java.util.concurrent.ConcurrentHashMap;
37 import java.util.concurrent.ConcurrentMap;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.Executors;
40 import java.util.concurrent.TimeUnit;
41 import java.util.zip.GZIPInputStream;
42
43 import org.apache.lucene.document.Document;
44 import org.apache.lucene.document.Field;
45 import org.apache.lucene.document.FieldType;
46 import org.apache.lucene.index.IndexOptions;
47 import org.apache.lucene.index.IndexWriter;
48 import org.apache.lucene.index.IndexWriterConfig;
49 import org.apache.lucene.store.FSDirectory;
50 import org.apache.maven.index.ArtifactInfo;
51 import org.apache.maven.index.context.IndexUtils;
52 import org.apache.maven.index.context.IndexingContext;
53 import org.apache.maven.index.context.NexusAnalyzer;
54 import org.apache.maven.index.context.NexusIndexWriter;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58
59
60
61
62
63 public class IndexDataReader
64 {
65 private static final Logger LOGGER = LoggerFactory.getLogger( IndexDataReader.class );
66
67 private final DataInputStream dis;
68
69 private final int threads;
70
71 public IndexDataReader( final InputStream is )
72 throws IOException
73 {
74 this( is, 1 );
75 }
76
77 public IndexDataReader( final InputStream is, final int threads )
78 throws IOException
79 {
80 if ( threads < 1 )
81 {
82 throw new IllegalArgumentException( "Reader threads must be greater than zero: " + threads );
83 }
84 this.threads = threads;
85
86
87
88 is.mark( 2 );
89 InputStream data;
90 if ( is.read() == 0x1f && is.read() == 0x8b )
91 {
92 is.reset();
93 data = new BufferedInputStream( new GZIPInputStream( is, 1024 * 8 ), 1024 * 8 );
94 }
95 else
96 {
97 is.reset();
98 data = new BufferedInputStream( is, 1024 * 8 );
99 }
100
101 this.dis = new DataInputStream( data );
102 }
103
104 public IndexDataReadResult readIndex( IndexWriter w, IndexingContext context )
105 throws IOException
106 {
107 if ( threads == 1 )
108 {
109 return readIndexST( w, context );
110 }
111 else
112 {
113 return readIndexMT( w, context );
114 }
115 }
116
117 private IndexDataReadResult readIndexST( IndexWriter w, IndexingContext context )
118 throws IOException
119 {
120 LOGGER.debug( "Reading ST index..." );
121 Instant start = Instant.now();
122 long timestamp = readHeader();
123
124 Date date = null;
125
126 if ( timestamp != -1 )
127 {
128 date = new Date( timestamp );
129
130 IndexUtils.updateTimestamp( w.getDirectory(), date );
131 }
132
133 int n = 0;
134
135 Document doc;
136 ConcurrentMap<String, Boolean> rootGroups = new ConcurrentHashMap<>();
137 ConcurrentMap<String, Boolean> allGroups = new ConcurrentHashMap<>();
138
139 while ( ( doc = readDocument() ) != null )
140 {
141 addToIndex( doc, context, w, rootGroups, allGroups );
142 n++;
143 }
144
145 w.commit();
146
147 IndexDataReadResult result = new IndexDataReadResult();
148 result.setDocumentCount( n );
149 result.setTimestamp( date );
150 result.setRootGroups( rootGroups.keySet() );
151 result.setAllGroups( allGroups.keySet() );
152
153 LOGGER.debug( "Reading ST index done in {} sec", Duration.between( start, Instant.now() ).getSeconds() );
154 return result;
155 }
156
157 private IndexDataReadResult readIndexMT( IndexWriter w, IndexingContext context )
158 throws IOException
159 {
160 LOGGER.debug( "Reading MT index..." );
161 Instant start = Instant.now();
162 long timestamp = readHeader();
163
164 int n = 0;
165
166 final Document theEnd = new Document();
167
168 ConcurrentMap<String, Boolean> rootGroups = new ConcurrentHashMap<>();
169 ConcurrentMap<String, Boolean> allGroups = new ConcurrentHashMap<>();
170 ArrayBlockingQueue<Document> queue = new ArrayBlockingQueue<>( 10000 );
171
172 ExecutorService executorService = Executors.newFixedThreadPool( threads );
173 ArrayList<Exception> errors = new ArrayList<>();
174 ArrayList<IndexWriter> silos = new ArrayList<>( threads );
175 for ( int i = 0; i < threads; i++ )
176 {
177 final int silo = i;
178 silos.add( tempWriter( "silo" + i ) );
179 executorService.execute( () ->
180 {
181 LOGGER.debug( "Starting thread {}", Thread.currentThread().getName() );
182 try
183 {
184 while ( true )
185 {
186 try
187 {
188 Document doc = queue.take();
189 if ( doc == theEnd )
190 {
191 break;
192 }
193 addToIndex( doc, context, silos.get( silo ), rootGroups, allGroups );
194 }
195 catch ( InterruptedException | IOException e )
196 {
197 errors.add( e );
198 break;
199 }
200 }
201 }
202 finally
203 {
204 LOGGER.debug( "Done thread {}", Thread.currentThread().getName() );
205 }
206 } );
207 }
208
209 try
210 {
211 Document doc;
212 while ( ( doc = readDocument() ) != null )
213 {
214 queue.put( doc );
215 n++;
216 }
217 LOGGER.debug( "Signalling END" );
218 for ( int i = 0; i < threads; i++ )
219 {
220 queue.put( theEnd );
221 }
222
223 LOGGER.debug( "Shutting down threads" );
224 executorService.shutdown();
225 executorService.awaitTermination( 5L, TimeUnit.MINUTES );
226 }
227 catch ( InterruptedException e )
228 {
229 throw new IOException( "Interrupted", e );
230 }
231
232 if ( !errors.isEmpty() )
233 {
234 IOException exception = new IOException( "Error during load of index" );
235 errors.forEach( exception::addSuppressed );
236 throw exception;
237 }
238
239 LOGGER.debug( "Silos loaded..." );
240 Date date = null;
241 if ( timestamp != -1 )
242 {
243 date = new Date( timestamp );
244 IndexUtils.updateTimestamp( w.getDirectory(), date );
245 }
246
247 LOGGER.debug( "Merging silos..." );
248 for ( IndexWriter silo : silos )
249 {
250 IndexUtils.close( silo );
251 w.addIndexes( silo.getDirectory() );
252 }
253
254 LOGGER.debug( "Merged silos..." );
255 w.commit();
256
257 IndexDataReadResult result = new IndexDataReadResult();
258 result.setDocumentCount( n );
259 result.setTimestamp( date );
260 result.setRootGroups( rootGroups.keySet() );
261 result.setAllGroups( allGroups.keySet() );
262
263 LOGGER.debug( "Reading MT index done in {} sec", Duration.between( start, Instant.now() ).getSeconds() );
264 return result;
265 }
266
267 private FSDirectory tempDirectory( final String name ) throws IOException
268 {
269 return FSDirectory.open( Files.createTempDirectory( name + ".dir" ) );
270 }
271
272 private IndexWriter tempWriter( final String name ) throws IOException
273 {
274 IndexWriterConfig config = new IndexWriterConfig( new NexusAnalyzer() );
275 config.setUseCompoundFile( false );
276 return new NexusIndexWriter( tempDirectory( name ), config );
277 }
278
279 private void addToIndex( final Document doc, final IndexingContext context, final IndexWriter indexWriter,
280 final ConcurrentMap<String, Boolean> rootGroups,
281 final ConcurrentMap<String, Boolean> allGroups )
282 throws IOException
283 {
284 ArtifactInfo ai = IndexUtils.constructArtifactInfo( doc, context );
285 if ( ai != null )
286 {
287 indexWriter.addDocument( IndexUtils.updateDocument( doc, context, false, ai ) );
288
289 rootGroups.putIfAbsent( ai.getRootGroup(), Boolean.TRUE );
290 allGroups.putIfAbsent( ai.getGroupId(), Boolean.TRUE );
291 }
292 else
293 {
294
295 if ( doc.getField( ArtifactInfo.ALL_GROUPS ) == null
296 && doc.getField( ArtifactInfo.ROOT_GROUPS ) == null )
297 {
298 indexWriter.addDocument( doc );
299 }
300 }
301 }
302
303 public long readHeader()
304 throws IOException
305 {
306 final byte hdrbyte = (byte) ( ( IndexDataWriter.VERSION << 24 ) >> 24 );
307
308 if ( hdrbyte != dis.readByte() )
309 {
310
311 throw new IOException( "Provided input contains unexpected data (0x01 expected as 1st byte)!" );
312 }
313
314 return dis.readLong();
315 }
316
317 public Document readDocument()
318 throws IOException
319 {
320 int fieldCount;
321 try
322 {
323 fieldCount = dis.readInt();
324 }
325 catch ( EOFException ex )
326 {
327 return null;
328 }
329
330 Document doc = new Document();
331
332 for ( int i = 0; i < fieldCount; i++ )
333 {
334 doc.add( readField() );
335 }
336
337
338 final Field uinfoField = (Field) doc.getField( ArtifactInfo.UINFO );
339 final String info = doc.get( ArtifactInfo.INFO );
340 if ( uinfoField != null && info != null && !info.isEmpty() )
341 {
342 final String[] splitInfo = ArtifactInfo.FS_PATTERN.split( info );
343 if ( splitInfo.length > 6 )
344 {
345 final String extension = splitInfo[6];
346 final String uinfoString = uinfoField.stringValue();
347 if ( uinfoString.endsWith( ArtifactInfo.FS + ArtifactInfo.NA ) )
348 {
349 uinfoField.setStringValue( uinfoString + ArtifactInfo.FS + ArtifactInfo.nvl( extension ) );
350 }
351 }
352 }
353
354 return doc;
355 }
356
357 private Field readField()
358 throws IOException
359 {
360 int flags = dis.read();
361
362 FieldType fieldType = new FieldType();
363 if ( ( flags & IndexDataWriter.F_INDEXED ) > 0 )
364 {
365 boolean tokenized = ( flags & IndexDataWriter.F_TOKENIZED ) > 0;
366 fieldType.setTokenized( tokenized );
367 }
368 fieldType.setIndexOptions( IndexOptions.DOCS_AND_FREQS_AND_POSITIONS );
369 fieldType.setStored( ( flags & IndexDataWriter.F_STORED ) > 0 );
370
371 String name = dis.readUTF();
372 String value = readUTF( dis );
373
374 return new Field( name, value, fieldType );
375 }
376
377 private static String readUTF( DataInput in )
378 throws IOException
379 {
380 int utflen = in.readInt();
381
382 byte[] bytearr;
383 char[] chararr;
384
385 try
386 {
387 bytearr = new byte[utflen];
388 chararr = new char[utflen];
389 }
390 catch ( OutOfMemoryError e )
391 {
392 throw new IOException( "Index data content is inappropriate (is junk?), leads to OutOfMemoryError!"
393 + " See MINDEXER-28 for more information!", e );
394 }
395
396 int c, char2, char3;
397 int count = 0;
398 int chararrCount = 0;
399
400 in.readFully( bytearr, 0, utflen );
401
402 while ( count < utflen )
403 {
404 c = bytearr[count] & 0xff;
405 if ( c > 127 )
406 {
407 break;
408 }
409 count++;
410 chararr[chararrCount++] = (char) c;
411 }
412
413 while ( count < utflen )
414 {
415 c = bytearr[count] & 0xff;
416 switch ( c >> 4 )
417 {
418 case 0:
419 case 1:
420 case 2:
421 case 3:
422 case 4:
423 case 5:
424 case 6:
425 case 7:
426
427 count++;
428 chararr[chararrCount++] = (char) c;
429 break;
430
431 case 12:
432 case 13:
433
434 count += 2;
435 if ( count > utflen )
436 {
437 throw new UTFDataFormatException( "malformed input: partial character at end" );
438 }
439 char2 = bytearr[count - 1];
440 if ( ( char2 & 0xC0 ) != 0x80 )
441 {
442 throw new UTFDataFormatException( "malformed input around byte " + count );
443 }
444 chararr[chararrCount++] = (char) ( ( ( c & 0x1F ) << 6 ) | ( char2 & 0x3F ) );
445 break;
446
447 case 14:
448
449 count += 3;
450 if ( count > utflen )
451 {
452 throw new UTFDataFormatException( "malformed input: partial character at end" );
453 }
454 char2 = bytearr[count - 2];
455 char3 = bytearr[count - 1];
456 if ( ( ( char2 & 0xC0 ) != 0x80 ) || ( ( char3 & 0xC0 ) != 0x80 ) )
457 {
458 throw new UTFDataFormatException( "malformed input around byte " + ( count - 1 ) );
459 }
460 chararr[chararrCount++] =
461 (char) ( ( ( c & 0x0F ) << 12 ) | ( ( char2 & 0x3F ) << 6 ) | ( ( char3 & 0x3F ) ) );
462 break;
463
464 default:
465
466 throw new UTFDataFormatException( "malformed input around byte " + count );
467 }
468 }
469
470
471 return new String( chararr, 0, chararrCount );
472 }
473
474
475
476
477 public static class IndexDataReadResult
478 {
479 private Date timestamp;
480
481 private int documentCount;
482
483 private Set<String> rootGroups;
484
485 private Set<String> allGroups;
486
487 public void setDocumentCount( int documentCount )
488 {
489 this.documentCount = documentCount;
490 }
491
492 public int getDocumentCount()
493 {
494 return documentCount;
495 }
496
497 public void setTimestamp( Date timestamp )
498 {
499 this.timestamp = timestamp;
500 }
501
502 public Date getTimestamp()
503 {
504 return timestamp;
505 }
506
507 public void setRootGroups( Set<String> rootGroups )
508 {
509 this.rootGroups = rootGroups;
510 }
511
512 public Set<String> getRootGroups()
513 {
514 return rootGroups;
515 }
516
517 public void setAllGroups( Set<String> allGroups )
518 {
519 this.allGroups = allGroups;
520 }
521
522 public Set<String> getAllGroups()
523 {
524 return allGroups;
525 }
526
527 }
528
529
530
531
532
533
534
535
536
537
538 public IndexDataReadResult readIndex( final IndexDataReadVisitor visitor, final IndexingContext context )
539 throws IOException
540 {
541 dis.readByte();
542
543 long timestamp = dis.readLong();
544
545 Date date = null;
546
547 if ( timestamp != -1 )
548 {
549 date = new Date( timestamp );
550 }
551
552 int n = 0;
553
554 Document doc;
555 while ( ( doc = readDocument() ) != null )
556 {
557 visitor.visitDocument( IndexUtils.updateDocument( doc, context, false ) );
558
559 n++;
560 }
561
562 IndexDataReadResult result = new IndexDataReadResult();
563 result.setDocumentCount( n );
564 result.setTimestamp( date );
565 return result;
566 }
567
568
569
570
571 public interface IndexDataReadVisitor
572 {
573
574
575
576
577
578
579 void visitDocument( Document document );
580
581 }
582
583 }