NebulaGraph Java Client 5.0.0
Loading...
Searching...
No Matches
ScanNodeResultIterator.java
1package com.vesoft.nebula.driver.graph.scan;
2
3import com.vesoft.nebula.driver.graph.data.HostAddress;
4import com.vesoft.nebula.driver.graph.data.ResultSet;
5import java.util.ArrayList;
6import java.util.Collections;
7import java.util.List;
8import java.util.Map;
9import java.util.NoSuchElementException;
10import java.util.concurrent.CountDownLatch;
11import org.slf4j.Logger;
12import org.slf4j.LoggerFactory;
13
15 private static final Logger logger = LoggerFactory.getLogger(ScanNodeResultIterator.class);
16
17 private static final String SCAN_NODE_TEMPLATE =
18 "USE `%s` CALL cursor_node_scan(\"%s\",\"%s\",%s,%d,\"%s\", %d) return *";
19
20 public ScanNodeResultIterator(String graphName,
21 String label,
22 List<String> propNames,
23 List<Integer> parts,
24 int batchSize,
25 int parallel,
26 List<HostAddress> servers,
27 String userName,
28 Map<String, Object> authOptions,
29 long requestTimeout) {
30 super(graphName, label, propNames, parts, batchSize,
31 parallel, servers, userName, authOptions, requestTimeout);
32 }
33
34
35 public ScanNodeResult next() {
36 if (!hasNext) {
37 throw new NoSuchElementException("iterator has no more data");
38 }
39 final List<ResultSet> results =
40 Collections.synchronizedList(new ArrayList<>(partCursor.size()));
41 List<Exception> exceptions =
42 Collections.synchronizedList(new ArrayList<>(partCursor.size()));
43 CountDownLatch countDownLatch = new CountDownLatch(partCursor.size());
44 for (Map.Entry<Integer, String> partCur : partCursor.entrySet()) {
45 threadPool.submit(() -> {
46 try {
47 ResultSet result = scan(SCAN_NODE_TEMPLATE, partCur);
48 // collect results and update the cursor
49 if (result.isSucceeded()) {
50 String cursor = getCursor(result);
51 partCursor.put(partCur.getKey(), cursor);
52 results.add(result);
53 } else {
54 logger.error(String.format("Scan part %d of node %s failed for %s, "
55 + "scan again in the next next()",
56 partCur.getKey(),
57 labelName,
58 result.getErrorMessage()));
59 exceptions.add(new Exception(String.format("part %d of %s scan error: ",
60 partCur.getKey(), labelName, result.getErrorMessage())));
61 }
62 } catch (Exception e) {
63 logger.error(String.format("Scan node error for %s", e.getMessage()), e);
64 exceptions.add(new Exception(String.format("part %d of %s scan failed: %s",
65 partCur.getKey(), labelName, e.getMessage()), e));
66 } finally {
67 countDownLatch.countDown();
68 }
69 });
70 }
71
72 try {
73 countDownLatch.await();
74 } catch (InterruptedException interruptedException) {
75 logger.error("scan interrupted:", interruptedException);
76 throw new RuntimeException("scan interrupted", interruptedException);
77 }
78
79 // As long as one part fails, the current iteration is considered as failed.
80 if (!exceptions.isEmpty()) {
81 List<String> exceptionMsg = new ArrayList<>();
82 for (Exception e : exceptions) {
83 exceptionMsg.add(e.getMessage());
84 }
85 throw new RuntimeException("scan node failed for current iterator: " + exceptionMsg);
86 }
87
88 hasNext = false;
89 List<Integer> partKeyNeedToRemove = new ArrayList<>();
90 for (Map.Entry<Integer, String> partCur : partCursor.entrySet()) {
91 if (!"".equals(partCur.getValue())) {
92 hasNext = true;
93 break;
94 } else {
95 partKeyNeedToRemove.add(partCur.getKey());
96 }
97 }
98 for (Integer part : partKeyNeedToRemove) {
99 partCursor.remove(part);
100 }
101 if (!hasNext && !threadPool.isShutdown()) {
102 threadPool.shutdown();
103 }
104 if (!hasNext) {
105 close();
106 }
107 return new ScanNodeResult(results, propNames);
108 }
109
110}