40 private final Logger logger = LoggerFactory.getLogger(this.getClass());
42 private List<HostAddress> servers;
43 private Builder builder;
44 private GrpcConnection connection;
45 private long sessionId;
47 private boolean isClosed =
false;
49 public static Builder builder(String addresses, String userName, String password) {
50 return new Builder(addresses, userName, password);
53 public static Builder builder(String addresses, String userName) {
54 return new Builder(addresses, userName,
null);
58 throws AuthFailedException, IOErrorException {
59 this.servers = builder.address;
60 this.builder = builder;
65 public ResultSet execute(String gql)
throws IOErrorException {
66 return execute(gql, builder.requestTimeoutMills);
69 public synchronized ResultSet execute(String gql,
long requestTimeout)
throws IOErrorException {
71 return new ResultSet(connection.execute(sessionId, gql, requestTimeout));
89 return connection.getServerAddress().toString();
92 public long getConnectTimeoutMills() {
93 return builder.connectTimeoutMills;
96 public long getRequestTimeoutMills() {
97 return builder.requestTimeoutMills;
100 public long getScanParallel() {
101 return builder.scanParallel;
111 public synchronized boolean ping(
long timeoutMs) {
114 return connection.ping(sessionId, timeoutMs);
115 }
catch (IOErrorException e) {
116 logger.error(
"ping error for host {}",
getHost(), e);
121 public boolean ping() {
122 return ping(DEFAULT_PING_TIMEOUT_MS);
131 if (connection !=
null) {
133 connection.execute(sessionId,
"SESSION CLOSE");
135 }
catch (Exception e) {
136 logger.warn(
"signout failed,", e);
157 private void checkClosed() {
159 throw new RuntimeException(
"The NebulaClient already closed.");
163 private void initClient() throws AuthFailedException, IOErrorException {
165 AuthResult authResult =
null;
166 connection =
new GrpcConnection();
167 int tryConnectTimes = servers.size();
168 Collections.shuffle(servers);
169 while (tryConnectTimes-- > 0) {
171 connection.open(servers.get(tryConnectTimes), builder);
172 authResult = connection.authenticate(builder.userName, builder.authOptions);
173 sessionId = authResult.getSessionId();
175 }
catch (AuthFailedException e) {
176 logger.error(
"create NebulaClient failed.", e);
178 }
catch (Exception e) {
179 if (tryConnectTimes == 0) {
180 logger.error(
"create NebulaClient failed.", e);
188 public ScanNodeResultIterator scanNode(String graphName, String nodeType) {
189 List<Integer> parts = getAllParts();
190 return scanNode(graphName, nodeType,
new ArrayList<>(),
true, parts, DEFAULT_BATCH_SIZE);
193 public ScanNodeResultIterator scanNode(String graphName,
195 List<String> returnProperties) {
196 List<Integer> parts = getAllParts();
197 boolean allProperties =
false;
198 if (returnProperties ==
null) {
199 allProperties =
true;
201 return scanNode(graphName, nodeType, returnProperties,
202 allProperties, parts, DEFAULT_BATCH_SIZE);
218 public ScanNodeResultIterator
scanNode(String graphName,
220 List<String> returnProperties,
222 List<Integer> parts = getAllParts();
223 boolean allProperties =
false;
224 if (returnProperties ==
null) {
225 allProperties =
true;
227 return scanNode(graphName, nodeType, returnProperties, allProperties, parts, batchSize);
244 public ScanNodeResultIterator
scanNode(String graphName,
246 List<String> returnProperties,
249 return scanNode(graphName, nodeType, returnProperties,
250 Collections.singletonList(part), batchSize);
268 public ScanNodeResultIterator
scanNode(String graphName,
270 List<String> returnProperties,
273 boolean allProperties =
false;
274 if (returnProperties ==
null) {
275 allProperties =
true;
277 return scanNode(graphName, nodeType, returnProperties, allProperties, parts, batchSize);
296 private ScanNodeResultIterator scanNode(String graphName,
298 List<String> returnProperties,
299 boolean allProperties,
303 List<String> nodeProperties =
null;
305 nodeProperties = getNodeProperties(graphName, nodeType);
306 }
catch (IOErrorException e) {
307 logger.error(
"get node schema failed.", e);
308 throw new RuntimeException(e);
312 List<String> propertyList =
new ArrayList<>();
314 propertyList.addAll(nodeProperties);
316 String primaryKey = nodeProperties.get(0);
318 propertyList.add(primaryKey);
319 for (String propName : returnProperties) {
320 if (propName.trim().equals(primaryKey)) {
323 propertyList.add(propName);
327 return new ScanNodeResultIterator(graphName,
332 builder.scanParallel,
336 builder.requestTimeoutMills);
340 public ScanEdgeResultIterator scanEdge(String graphName, String edgeType) {
341 List<Integer> parts = getAllParts();
342 return scanEdge(graphName, edgeType,
new ArrayList<>(),
true, parts, DEFAULT_BATCH_SIZE);
345 public ScanEdgeResultIterator scanEdge(String graphName,
347 List<String> returnProperties) {
348 List<Integer> parts = getAllParts();
349 boolean allProperties =
false;
350 if (returnProperties ==
null) {
351 allProperties =
true;
353 return scanEdge(graphName, edgeType, returnProperties,
354 allProperties, parts, DEFAULT_BATCH_SIZE);
370 public ScanEdgeResultIterator
scanEdge(String graphName,
372 List<String> returnProperties,
374 boolean allProperties =
false;
375 if (returnProperties ==
null) {
376 allProperties =
true;
378 List<Integer> parts = getAllParts();
379 return scanEdge(graphName, edgeType, returnProperties, allProperties,
397 public ScanEdgeResultIterator
scanEdge(String graphName,
399 List<String> returnProperties,
402 boolean allProperties =
false;
403 if (returnProperties ==
null) {
404 allProperties =
true;
411 Collections.singletonList(part),
429 public ScanEdgeResultIterator
scanEdge(String graphName,
431 List<String> returnProperties,
434 boolean allProperties =
false;
435 if (returnProperties ==
null) {
436 allProperties =
true;
438 return scanEdge(graphName, edgeType, returnProperties,
false, parts, batchSize);
456 private ScanEdgeResultIterator scanEdge(String graphName,
458 List<String> returnProperties,
459 boolean allProperties,
463 List<String> edgeProperties =
null;
465 edgeProperties = getEdgeProperties(graphName, edgeType);
466 }
catch (IOErrorException e) {
467 logger.error(
"get node schema failed.", e);
468 throw new RuntimeException(e);
472 List<String> propertyList =
new ArrayList<>();
474 propertyList.addAll(edgeProperties);
476 propertyList.addAll(returnProperties);
478 return new ScanEdgeResultIterator(graphName,
483 builder.scanParallel,
487 builder.requestTimeoutMills);
496 private List<Integer> getAllParts() {
497 String showPartitions =
"CALL show_partitions() RETURN *";
500 resultSet = execute(showPartitions);
501 }
catch (Exception e) {
502 logger.error(
"get all partitions error", e);
503 throw new RuntimeException(
"get all partitions error", e);
505 if (!resultSet.isSucceeded() || resultSet.isEmpty()) {
506 logger.error(
"get all partitions failed for {}", resultSet.getErrorMessage());
507 throw new RuntimeException(
508 "get all partitions failed for " + resultSet.getErrorMessage());
511 List<Integer> partitions =
new ArrayList<>();
512 while (resultSet.hasNext()) {
513 partitions.add(resultSet.next().get(
"partition_id").asInt());
515 if (partitions.contains(0)) {
516 partitions.remove(Integer.valueOf(0));
529 private List<String> getNodeProperties(String graphName, String nodeType)
530 throws IOErrorException {
531 String graphType = getGraphType(graphName);
532 String descNodeType = String.format(
"DESCRIBE NODE TYPE `%s` OF `%s`",
533 GqlUtil.escape(nodeType),
534 GqlUtil.escape(graphType));
535 ResultSet resultSet = execute(descNodeType);
536 if (!resultSet.isSucceeded() || resultSet.isEmpty()) {
537 logger.error(String.format(
"get description of %s failed for %s", nodeType,
538 resultSet.getErrorMessage()));
539 throw new IllegalArgumentException(String.format(
"node type %s does not exist in %s",
540 nodeType, graphName));
543 List<String> pks =
new ArrayList<>();
544 List<String> propNames =
new ArrayList<>();
545 while (resultSet.hasNext()) {
546 ResultSet.Record record = resultSet.next();
547 propNames.add(record.get(
"property_name").asString());
548 if (
"Y".equals(record.get(
"primary_key").asString())) {
549 pks.add(record.get(
"property_name").asString());
554 logger.error(
"node type " + nodeType +
" has no primary key.");
555 throw new RuntimeException(
"node type " + nodeType +
" has no primary key");
559 List<String> propertyNames =
new ArrayList<>(pks);
560 for (String property : propNames) {
561 if (pks.contains(property)) {
564 propertyNames.add(property);
566 return propertyNames;
577 private List<String> getEdgeProperties(String graphName, String edgeType)
578 throws IOErrorException {
579 String graphType = getGraphType(graphName);
581 String descEdgeType = String.format(
582 "CALL describe_graph_type(\"%s\") FILTER type_name=\"%s\" return properties",
583 GqlUtil.escape(graphType), GqlUtil.escape(edgeType));
584 ResultSet resultSet = execute(descEdgeType);
585 if (!resultSet.isSucceeded() || resultSet.isEmpty()) {
586 logger.error(String.format(
"get description of %s failed for %s", edgeType,
587 resultSet.getErrorMessage()));
588 throw new IllegalArgumentException(String.format(
"edge type %s does not exist in %s",
589 edgeType, graphName));
592 List<ValueWrapper> properties = resultSet.next().get(
"properties").asList();
593 return properties.stream().map(ValueWrapper::asString).collect(Collectors.toList());
603 private String getGraphType(String graphName)
throws IOErrorException {
604 ResultSet resultSet = execute(String.format(
"DESCRIBE GRAPH `%s`",
605 GqlUtil.escape(graphName)));
607 if (resultSet.isSucceeded() && !resultSet.isEmpty()) {
608 graphType = resultSet.next().values().get(1).asString();
610 throw new IllegalArgumentException(
"graphName " + graphName +
" does not exist.");
615 public static class Builder {
616 protected final List<HostAddress> address;
617 protected final String userName;
618 protected final String password;
619 protected Map<String, Object> authOptions =
new HashMap<>();
620 protected long connectTimeoutMills = DEFAULT_CONNECT_TIMEOUT_MS;
622 protected long requestTimeoutMills = DEFAULT_REQUEST_TIMEOUT_MS;
623 protected int scanParallel = DEFAULT_SCAN_PARALLEL;
624 protected boolean enableTls = DEFAULT_ENABLE_TLS;
625 protected String tlsCa;
626 protected String tlsCert;
627 protected String tlsKey;
628 protected boolean tlsPeerNameVerify = DEFAULT_TLS_PEER_NAME_VERIFY;
629 protected String tlsPeerName;
638 public Builder(String addresses, String userName, String password) {
640 this.address = AddressUtil.validateAddress(addresses);
641 }
catch (UnknownHostException e) {
642 throw new RuntimeException(e);
644 this.userName = userName;
645 this.password = password;
654 public Builder withAuthOptions(Map<String, Object> authOptions) {
655 if (authOptions !=
null) {
656 this.authOptions.putAll(authOptions);
668 public Builder withConnectTimeoutMills(
long connectTimeoutMills) {
669 if (connectTimeoutMills <= 0 || connectTimeoutMills > DEFAULT_MAX_TIMEOUT_MS) {
670 this.connectTimeoutMills = DEFAULT_MAX_TIMEOUT_MS;
672 this.connectTimeoutMills = connectTimeoutMills;
684 public Builder withRequestTimeoutMills(
long requestTimeoutMills) {
685 if (requestTimeoutMills < 0 || requestTimeoutMills > DEFAULT_MAX_TIMEOUT_MS) {
686 this.requestTimeoutMills = DEFAULT_MAX_TIMEOUT_MS;
688 this.requestTimeoutMills = requestTimeoutMills;
699 public Builder withScanParallel(
int scanParallel) {
700 this.scanParallel = scanParallel;
710 public Builder withEnableTls(
boolean enableTls) {
711 this.enableTls = enableTls;
721 public Builder withTlsCa(String ca) {
733 public Builder withTlsCert(String cert, String key) {
747 public Builder withTlsPeerName(String tlsPeerName) {
748 this.tlsPeerName = tlsPeerName;
752 public void check() {
753 if (address ==
null) {
754 throw new IllegalArgumentException(
"Graph addresses cannot be empty.");
757 if (enableTls && tlsCa ==
null) {
758 throw new IllegalArgumentException(
"TLS is enable, tlsCa cannot be empty.");
760 if (enableTls && tlsPeerNameVerify && tlsPeerName ==
null) {
761 throw new IllegalArgumentException(
762 "TLS is enable, tlsPeerName cannot be empty.");
771 public NebulaClient build() throws AuthFailedException, IOErrorException {
773 if (password !=
null) {
774 authOptions.put(
"password", password);
776 return new NebulaClient(
this);