17 private static final String SCAN_NODE_TEMPLATE =
18 "USE `%s` CALL cursor_node_scan(\"%s\",\"%s\",%s,%d,\"%s\", %d) return *";
22 List<String> propNames,
26 List<HostAddress> servers,
28 Map<String, Object> authOptions,
29 long requestTimeout) {
30 super(graphName, label, propNames, parts, batchSize,
31 parallel, servers, userName, authOptions, requestTimeout);
37 throw new NoSuchElementException(
"iterator has no more data");
39 final List<ResultSet> results =
40 Collections.synchronizedList(
new ArrayList<>(partCursor.size()));
41 List<Exception> exceptions =
42 Collections.synchronizedList(
new ArrayList<>(partCursor.size()));
43 CountDownLatch countDownLatch =
new CountDownLatch(partCursor.size());
44 for (Map.Entry<Integer, String> partCur : partCursor.entrySet()) {
45 threadPool.submit(() -> {
47 ResultSet result = scan(SCAN_NODE_TEMPLATE, partCur);
49 if (result.isSucceeded()) {
50 String cursor = getCursor(result);
51 partCursor.put(partCur.getKey(), cursor);
54 logger.error(String.format(
"Scan part %d of node %s failed for %s, "
55 +
"scan again in the next next()",
58 result.getErrorMessage()));
59 exceptions.add(
new Exception(String.format(
"part %d of %s scan error: ",
60 partCur.getKey(), labelName, result.getErrorMessage())));
62 }
catch (Exception e) {
63 logger.error(String.format(
"Scan node error for %s", e.getMessage()), e);
64 exceptions.add(
new Exception(String.format(
"part %d of %s scan failed: %s",
65 partCur.getKey(), labelName, e.getMessage()), e));
67 countDownLatch.countDown();
73 countDownLatch.await();
74 }
catch (InterruptedException interruptedException) {
75 logger.error(
"scan interrupted:", interruptedException);
76 throw new RuntimeException(
"scan interrupted", interruptedException);
80 if (!exceptions.isEmpty()) {
81 List<String> exceptionMsg =
new ArrayList<>();
82 for (Exception e : exceptions) {
83 exceptionMsg.add(e.getMessage());
85 throw new RuntimeException(
"scan node failed for current iterator: " + exceptionMsg);
89 List<Integer> partKeyNeedToRemove =
new ArrayList<>();
90 for (Map.Entry<Integer, String> partCur : partCursor.entrySet()) {
91 if (!
"".equals(partCur.getValue())) {
95 partKeyNeedToRemove.add(partCur.getKey());
98 for (Integer part : partKeyNeedToRemove) {
99 partCursor.remove(part);
101 if (!hasNext && !threadPool.isShutdown()) {
102 threadPool.shutdown();