1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.maven.index.updater;
20
21 import java.io.BufferedInputStream;
22 import java.io.DataInput;
23 import java.io.DataInputStream;
24 import java.io.EOFException;
25 import java.io.File;
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.io.UTFDataFormatException;
29 import java.nio.file.Files;
30 import java.time.Duration;
31 import java.time.Instant;
32 import java.util.ArrayList;
33 import java.util.Date;
34 import java.util.HashSet;
35 import java.util.Set;
36 import java.util.concurrent.ArrayBlockingQueue;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.ExecutorService;
39 import java.util.concurrent.Executors;
40 import java.util.concurrent.TimeUnit;
41 import java.util.zip.GZIPInputStream;
42
43 import org.apache.lucene.document.Document;
44 import org.apache.lucene.document.Field;
45 import org.apache.lucene.document.FieldType;
46 import org.apache.lucene.index.IndexOptions;
47 import org.apache.lucene.index.IndexWriter;
48 import org.apache.lucene.index.IndexWriterConfig;
49 import org.apache.lucene.store.Directory;
50 import org.apache.lucene.store.FSDirectory;
51 import org.apache.maven.index.ArtifactInfo;
52 import org.apache.maven.index.context.IndexUtils;
53 import org.apache.maven.index.context.IndexingContext;
54 import org.apache.maven.index.context.NexusAnalyzer;
55 import org.apache.maven.index.context.NexusIndexWriter;
56 import org.slf4j.Logger;
57 import org.slf4j.LoggerFactory;
58
59
60
61
62
63
64 public class IndexDataReader {
65 private static final Logger LOGGER = LoggerFactory.getLogger(IndexDataReader.class);
66
67 private final DataInputStream dis;
68
69 private final int threads;
70
71 public IndexDataReader(final InputStream is) throws IOException {
72 this(is, 1);
73 }
74
75 public IndexDataReader(final InputStream is, final int threads) throws IOException {
76 if (threads < 1) {
77 throw new IllegalArgumentException("Reader threads must be greater than zero: " + threads);
78 }
79 this.threads = threads;
80
81
82
83 is.mark(2);
84 InputStream data;
85 if (is.read() == 0x1f && is.read() == 0x8b)
86 {
87 is.reset();
88 data = new BufferedInputStream(new GZIPInputStream(is, 1024 * 8), 1024 * 8);
89 } else {
90 is.reset();
91 data = new BufferedInputStream(is, 1024 * 8);
92 }
93
94 this.dis = new DataInputStream(data);
95 }
96
97 public IndexDataReadResult readIndex(IndexWriter w, IndexingContext context) throws IOException {
98 if (threads == 1) {
99 return readIndexST(w, context);
100 } else {
101 return readIndexMT(w, context);
102 }
103 }
104
105 private IndexDataReadResult readIndexST(IndexWriter w, IndexingContext context) throws IOException {
106 LOGGER.debug("Reading ST index...");
107 Instant start = Instant.now();
108 long timestamp = readHeader();
109
110 Date date = null;
111
112 if (timestamp != -1) {
113 date = new Date(timestamp);
114
115 IndexUtils.updateTimestamp(w.getDirectory(), date);
116 }
117
118 int n = 0;
119
120 Document doc;
121 Set<String> rootGroups = new HashSet<>();
122 Set<String> allGroups = new HashSet<>();
123
124 while ((doc = readDocument()) != null) {
125 addToIndex(doc, context, w, rootGroups, allGroups);
126 n++;
127 }
128
129 w.commit();
130
131 IndexDataReadResult result = new IndexDataReadResult();
132 result.setDocumentCount(n);
133 result.setTimestamp(date);
134 result.setRootGroups(rootGroups);
135 result.setAllGroups(allGroups);
136
137 LOGGER.debug(
138 "Reading ST index done in {} sec",
139 Duration.between(start, Instant.now()).getSeconds());
140 return result;
141 }
142
143 private IndexDataReadResult readIndexMT(IndexWriter w, IndexingContext context) throws IOException {
144 LOGGER.debug("Reading MT index...");
145 Instant start = Instant.now();
146 long timestamp = readHeader();
147
148 int n = 0;
149
150 final Document theEnd = new Document();
151
152 Set<String> rootGroups = ConcurrentHashMap.newKeySet();
153 Set<String> allGroups = ConcurrentHashMap.newKeySet();
154 ArrayBlockingQueue<Document> queue = new ArrayBlockingQueue<>(10000);
155
156 ExecutorService executorService = Executors.newFixedThreadPool(threads);
157 ArrayList<Exception> errors = new ArrayList<>();
158 ArrayList<FSDirectory> siloDirectories = new ArrayList<>(threads);
159 ArrayList<IndexWriter> siloWriters = new ArrayList<>(threads);
160 LOGGER.debug("Creating {} silo writer threads...", threads);
161 for (int i = 0; i < threads; i++) {
162 final int silo = i;
163 FSDirectory siloDirectory = tempDirectory("silo" + i);
164 siloDirectories.add(siloDirectory);
165 siloWriters.add(tempWriter(siloDirectory));
166 executorService.execute(() -> {
167 LOGGER.debug("Starting thread {}", Thread.currentThread().getName());
168 try {
169 while (true) {
170 try {
171 Document doc = queue.take();
172 if (doc == theEnd) {
173 break;
174 }
175 addToIndex(doc, context, siloWriters.get(silo), rootGroups, allGroups);
176 } catch (InterruptedException | IOException e) {
177 errors.add(e);
178 break;
179 }
180 }
181 } finally {
182 LOGGER.debug("Done thread {}", Thread.currentThread().getName());
183 }
184 });
185 }
186
187 LOGGER.debug("Loading up documents into silos");
188 try {
189 Document doc;
190 while ((doc = readDocument()) != null) {
191 queue.put(doc);
192 n++;
193 }
194 LOGGER.debug("Signalling END");
195 for (int i = 0; i < threads; i++) {
196 queue.put(theEnd);
197 }
198
199 LOGGER.debug("Shutting down threads");
200 executorService.shutdown();
201 executorService.awaitTermination(5L, TimeUnit.MINUTES);
202 } catch (InterruptedException e) {
203 throw new IOException("Interrupted", e);
204 }
205
206 if (!errors.isEmpty()) {
207 IOException exception = new IOException("Error during load of index");
208 errors.forEach(exception::addSuppressed);
209 throw exception;
210 }
211
212 LOGGER.debug("Silos loaded...");
213 Date date = null;
214 if (timestamp != -1) {
215 date = new Date(timestamp);
216 IndexUtils.updateTimestamp(w.getDirectory(), date);
217 }
218
219 LOGGER.debug("Closing silo writers...");
220 for (IndexWriter siloWriter : siloWriters) {
221 siloWriter.commit();
222 siloWriter.close();
223 }
224
225 LOGGER.debug("Merging silo directories...");
226 w.addIndexes(siloDirectories.toArray(new Directory[0]));
227
228 LOGGER.debug("Cleanup of silo directories...");
229 for (FSDirectory siloDirectory : siloDirectories) {
230 File dir = siloDirectory.getDirectory().toFile();
231 siloDirectory.close();
232 IndexUtils.delete(dir);
233 }
234
235 LOGGER.debug("Finalizing...");
236 w.commit();
237
238 IndexDataReadResult result = new IndexDataReadResult();
239 result.setDocumentCount(n);
240 result.setTimestamp(date);
241 result.setRootGroups(rootGroups);
242 result.setAllGroups(allGroups);
243
244 LOGGER.debug(
245 "Reading MT index done in {} sec",
246 Duration.between(start, Instant.now()).getSeconds());
247 return result;
248 }
249
250 private FSDirectory tempDirectory(final String name) throws IOException {
251 return FSDirectory.open(Files.createTempDirectory(name + ".dir"));
252 }
253
254 private IndexWriter tempWriter(final FSDirectory directory) throws IOException {
255 IndexWriterConfig config = new IndexWriterConfig(new NexusAnalyzer());
256 config.setUseCompoundFile(false);
257 return new NexusIndexWriter(directory, config);
258 }
259
260 private void addToIndex(
261 final Document doc,
262 final IndexingContext context,
263 final IndexWriter indexWriter,
264 final Set<String> rootGroups,
265 final Set<String> allGroups)
266 throws IOException {
267 ArtifactInfo ai = IndexUtils.constructArtifactInfo(doc, context);
268 if (ai != null) {
269 indexWriter.addDocument(IndexUtils.updateDocument(doc, context, false, ai));
270
271 rootGroups.add(ai.getRootGroup());
272 allGroups.add(ai.getGroupId());
273 } else {
274
275 if (doc.getField(ArtifactInfo.ALL_GROUPS) == null && doc.getField(ArtifactInfo.ROOT_GROUPS) == null) {
276 indexWriter.addDocument(doc);
277 }
278 }
279 }
280
281 public long readHeader() throws IOException {
282 final byte hdrbyte = (byte) ((IndexDataWriter.VERSION << 24) >> 24);
283
284 if (hdrbyte != dis.readByte()) {
285
286 throw new IOException("Provided input contains unexpected data (0x01 expected as 1st byte)!");
287 }
288
289 return dis.readLong();
290 }
291
292 public Document readDocument() throws IOException {
293 int fieldCount;
294 try {
295 fieldCount = dis.readInt();
296 } catch (EOFException ex) {
297 return null;
298 }
299
300 Document doc = new Document();
301
302 for (int i = 0; i < fieldCount; i++) {
303 doc.add(readField());
304 }
305
306
307 final Field uinfoField = (Field) doc.getField(ArtifactInfo.UINFO);
308 final String info = doc.get(ArtifactInfo.INFO);
309 if (uinfoField != null && info != null && !info.isEmpty()) {
310 final String[] splitInfo = ArtifactInfo.FS_PATTERN.split(info);
311 if (splitInfo.length > 6) {
312 final String extension = splitInfo[6];
313 final String uinfoString = uinfoField.stringValue();
314 if (uinfoString.endsWith(ArtifactInfo.FS + ArtifactInfo.NA)) {
315 uinfoField.setStringValue(uinfoString + ArtifactInfo.FS + ArtifactInfo.nvl(extension));
316 }
317 }
318 }
319
320 return doc;
321 }
322
323 private Field readField() throws IOException {
324 int flags = dis.read();
325
326 FieldType fieldType = new FieldType();
327 if ((flags & IndexDataWriter.F_INDEXED) > 0) {
328 boolean tokenized = (flags & IndexDataWriter.F_TOKENIZED) > 0;
329 fieldType.setTokenized(tokenized);
330 }
331 fieldType.setIndexOptions(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS);
332 fieldType.setStored((flags & IndexDataWriter.F_STORED) > 0);
333
334 String name = dis.readUTF();
335 String value = readUTF(dis);
336
337 return new Field(name, value, fieldType);
338 }
339
340 private static String readUTF(DataInput in) throws IOException {
341 int utflen = in.readInt();
342
343 byte[] bytearr;
344 char[] chararr;
345
346 try {
347 bytearr = new byte[utflen];
348 chararr = new char[utflen];
349 } catch (OutOfMemoryError e) {
350 throw new IOException(
351 "Index data content is inappropriate (is junk?), leads to OutOfMemoryError!"
352 + " See MINDEXER-28 for more information!",
353 e);
354 }
355
356 int c, char2, char3;
357 int count = 0;
358 int chararrCount = 0;
359
360 in.readFully(bytearr, 0, utflen);
361
362 while (count < utflen) {
363 c = bytearr[count] & 0xff;
364 if (c > 127) {
365 break;
366 }
367 count++;
368 chararr[chararrCount++] = (char) c;
369 }
370
371 while (count < utflen) {
372 c = bytearr[count] & 0xff;
373 switch (c >> 4) {
374 case 0:
375 case 1:
376 case 2:
377 case 3:
378 case 4:
379 case 5:
380 case 6:
381 case 7:
382
383 count++;
384 chararr[chararrCount++] = (char) c;
385 break;
386
387 case 12:
388 case 13:
389
390 count += 2;
391 if (count > utflen) {
392 throw new UTFDataFormatException("malformed input: partial character at end");
393 }
394 char2 = bytearr[count - 1];
395 if ((char2 & 0xC0) != 0x80) {
396 throw new UTFDataFormatException("malformed input around byte " + count);
397 }
398 chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
399 break;
400
401 case 14:
402
403 count += 3;
404 if (count > utflen) {
405 throw new UTFDataFormatException("malformed input: partial character at end");
406 }
407 char2 = bytearr[count - 2];
408 char3 = bytearr[count - 1];
409 if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
410 throw new UTFDataFormatException("malformed input around byte " + (count - 1));
411 }
412 chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F)));
413 break;
414
415 default:
416
417 throw new UTFDataFormatException("malformed input around byte " + count);
418 }
419 }
420
421
422 return new String(chararr, 0, chararrCount);
423 }
424
425
426
427
428 public static class IndexDataReadResult {
429 private Date timestamp;
430
431 private int documentCount;
432
433 private Set<String> rootGroups;
434
435 private Set<String> allGroups;
436
437 public void setDocumentCount(int documentCount) {
438 this.documentCount = documentCount;
439 }
440
441 public int getDocumentCount() {
442 return documentCount;
443 }
444
445 public void setTimestamp(Date timestamp) {
446 this.timestamp = timestamp;
447 }
448
449 public Date getTimestamp() {
450 return timestamp;
451 }
452
453 public void setRootGroups(Set<String> rootGroups) {
454 this.rootGroups = rootGroups;
455 }
456
457 public Set<String> getRootGroups() {
458 return rootGroups;
459 }
460
461 public void setAllGroups(Set<String> allGroups) {
462 this.allGroups = allGroups;
463 }
464
465 public Set<String> getAllGroups() {
466 return allGroups;
467 }
468 }
469
470
471
472
473
474
475
476
477
478
479 public IndexDataReadResult readIndex(final IndexDataReadVisitor visitor, final IndexingContext context)
480 throws IOException {
481 dis.readByte();
482
483 long timestamp = dis.readLong();
484
485 Date date = null;
486
487 if (timestamp != -1) {
488 date = new Date(timestamp);
489 }
490
491 int n = 0;
492
493 Document doc;
494 while ((doc = readDocument()) != null) {
495 visitor.visitDocument(IndexUtils.updateDocument(doc, context, false));
496
497 n++;
498 }
499
500 IndexDataReadResult result = new IndexDataReadResult();
501 result.setDocumentCount(n);
502 result.setTimestamp(date);
503 return result;
504 }
505
506
507
508
509 public interface IndexDataReadVisitor {
510
511
512
513
514
515
516 void visitDocument(Document document);
517 }
518 }