NebulaGraph Java Client 5.0.0
Loading...
Searching...
No Matches
ScanEdgeResultIterator.java
1package com.vesoft.nebula.driver.graph.scan;
2
3import com.vesoft.nebula.driver.graph.data.HostAddress;
4import com.vesoft.nebula.driver.graph.data.ResultSet;
5import java.util.ArrayList;
6import java.util.Collections;
7import java.util.List;
8import java.util.Map;
9import java.util.NoSuchElementException;
10import java.util.concurrent.CountDownLatch;
11import org.slf4j.Logger;
12import org.slf4j.LoggerFactory;
13
15 private static final Logger logger = LoggerFactory.getLogger(ScanNodeResultIterator.class);
16
17 private static final String SCAN_EDGE_TEMPLATE =
18 "USE `%s` CALL cursor_edge_scan(\"%s\",\"%s\",%s,%d,\"%s\", %d) return *";
19
20 public ScanEdgeResultIterator(String graphName,
21 String label,
22 List<String> propNames,
23 List<Integer> parts,
24 int batchSize,
25 int parallel,
26 List<HostAddress> servers,
27 String userName,
28 Map<String, Object> authOptions,
29 long requestTimeoutMs) {
30 super(graphName, label, propNames, parts, batchSize,
31 parallel, servers, userName, authOptions, requestTimeoutMs);
32 }
33
34
35 public ScanEdgeResult next() {
36 if (!hasNext) {
37 throw new NoSuchElementException("iterator has no more data");
38 }
39 final List<ResultSet> results =
40 Collections.synchronizedList(new ArrayList<>(partCursor.size()));
41 List<Exception> exceptions =
42 Collections.synchronizedList(new ArrayList<>(partCursor.size()));
43 CountDownLatch countDownLatch = new CountDownLatch(partCursor.size());
44 for (Map.Entry<Integer, String> partCur : partCursor.entrySet()) {
45 threadPool.submit(() -> {
46 try {
47 ResultSet result = scan(SCAN_EDGE_TEMPLATE, partCur);
48 // collect results and update the cursor
49 if (result.isSucceeded()) {
50 String cursor = getCursor(result);
51 partCursor.put(partCur.getKey(), cursor);
52 results.add(result);
53 } else {
54 logger.error(String.format("Scan part %d of edge %s failed for %s, "
55 + "scan again in the next next()",
56 partCur.getKey(),
57 labelName,
58 result.getErrorMessage()));
59 exceptions.add(new Exception(String.format("part %d of %s scan error: %s",
60 partCur.getKey(), labelName, result.getErrorMessage())));
61 }
62 } catch (Exception e) {
63 logger.error(String.format("Scan node error for %s", e.getMessage()), e);
64 exceptions.add(new Exception(String.format("part %d of %s scan failed: %s",
65 partCur.getKey(), labelName, e.getMessage()), e));
66 } finally {
67 countDownLatch.countDown();
68 }
69 });
70 }
71
72 try {
73 countDownLatch.await();
74 } catch (InterruptedException interruptedException) {
75 logger.error("scan interrupted:", interruptedException);
76 throw new RuntimeException("scan interrupted", interruptedException);
77 }
78
79 // As long as one part fails, the current iteration is considered as failed.
80 if (!exceptions.isEmpty()) {
81 List<String> exceptionMsg = new ArrayList<>();
82 for (Exception e : exceptions) {
83 exceptionMsg.add(e.getMessage());
84 }
85 throw new RuntimeException("scan node failed for current iterator: " + exceptionMsg);
86 }
87
88 // check if the iterator of part has more data
89 hasNext = false;
90 List<Integer> partKeyNeedToRemove = new ArrayList<>();
91 for (Map.Entry<Integer, String> partCur : partCursor.entrySet()) {
92 if (!"".equals(partCur.getValue())) {
93 hasNext = true;
94 break;
95 } else {
96 partKeyNeedToRemove.add(partCur.getKey());
97 }
98 }
99 for (Integer part : partKeyNeedToRemove) {
100 partCursor.remove(part);
101 }
102 if (!hasNext && !threadPool.isShutdown()) {
103 threadPool.shutdown();
104 }
105 if (!hasNext) {
106 close();
107 }
108 return new ScanEdgeResult(results, propNames);
109 }
110
111}