NebulaGraph Java Client 5.0.0
Loading...
Searching...
No Matches
NebulaClient.java
1package com.vesoft.nebula.driver.graph.net;
2
3import static com.vesoft.nebula.driver.graph.net.Constants.DEFAULT_BATCH_SIZE;
4import static com.vesoft.nebula.driver.graph.net.Constants.DEFAULT_CONNECT_TIMEOUT_MS;
5import static com.vesoft.nebula.driver.graph.net.Constants.DEFAULT_ENABLE_TLS;
6import static com.vesoft.nebula.driver.graph.net.Constants.DEFAULT_MAX_TIMEOUT_MS;
7import static com.vesoft.nebula.driver.graph.net.Constants.DEFAULT_PING_TIMEOUT_MS;
8import static com.vesoft.nebula.driver.graph.net.Constants.DEFAULT_REQUEST_TIMEOUT_MS;
9import static com.vesoft.nebula.driver.graph.net.Constants.DEFAULT_SCAN_PARALLEL;
10import static com.vesoft.nebula.driver.graph.net.Constants.DEFAULT_TLS_PEER_NAME_VERIFY;
11
12import com.vesoft.nebula.driver.graph.data.HostAddress;
13import com.vesoft.nebula.driver.graph.data.ResultSet;
14import com.vesoft.nebula.driver.graph.data.ValueWrapper;
15import com.vesoft.nebula.driver.graph.exception.AuthFailedException;
16import com.vesoft.nebula.driver.graph.exception.IOErrorException;
17import com.vesoft.nebula.driver.graph.scan.ScanEdgeResultIterator;
18import com.vesoft.nebula.driver.graph.scan.ScanNodeResultIterator;
19import com.vesoft.nebula.driver.graph.utils.AddressUtil;
20import com.vesoft.nebula.driver.graph.utils.GqlUtil;
21import com.vesoft.nebula.driver.graph.utils.TlsUtil;
22import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts;
23import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
24import java.io.Serializable;
25import java.net.UnknownHostException;
26import java.util.ArrayList;
27import java.util.Collections;
28import java.util.HashMap;
29import java.util.List;
30import java.util.Map;
31import java.util.stream.Collectors;
32import javax.net.ssl.SSLException;
33import org.slf4j.Logger;
34import org.slf4j.LoggerFactory;
35
39public class NebulaClient implements Serializable {
40 private final Logger logger = LoggerFactory.getLogger(this.getClass());
41
42 private List<HostAddress> servers;
43 private Builder builder;
44 private GrpcConnection connection;
45 private long sessionId;
46
47 private boolean isClosed = false;
48
49 public static Builder builder(String addresses, String userName, String password) {
50 return new Builder(addresses, userName, password);
51 }
52
53 public static Builder builder(String addresses, String userName) {
54 return new Builder(addresses, userName, null);
55 }
56
57 private NebulaClient(NebulaClient.Builder builder)
58 throws AuthFailedException, IOErrorException {
59 this.servers = builder.address;
60 this.builder = builder;
61 initClient();
62 }
63
64
65 public ResultSet execute(String gql) throws IOErrorException {
66 return execute(gql, builder.requestTimeoutMills);
67 }
68
69 public synchronized ResultSet execute(String gql, long requestTimeout) throws IOErrorException {
70 checkClosed();
71 return new ResultSet(connection.execute(sessionId, gql, requestTimeout));
72 }
73
79 public long getSessionId() {
80 return sessionId;
81 }
82
88 public String getHost() {
89 return connection.getServerAddress().toString();
90 }
91
92 public long getConnectTimeoutMills() {
93 return builder.connectTimeoutMills;
94 }
95
96 public long getRequestTimeoutMills() {
97 return builder.requestTimeoutMills;
98 }
99
100 public long getScanParallel() {
101 return builder.scanParallel;
102 }
103
111 public synchronized boolean ping(long timeoutMs) {
112 checkClosed();
113 try {
114 return connection.ping(sessionId, timeoutMs);
115 } catch (IOErrorException e) {
116 logger.error("ping error for host {}", getHost(), e);
117 return false;
118 }
119 }
120
121 public boolean ping() {
122 return ping(DEFAULT_PING_TIMEOUT_MS);
123 }
124
128 public synchronized void close() {
129 if (!isClosed) {
130 isClosed = true;
131 if (connection != null) {
132 try {
133 connection.execute(sessionId, "SESSION CLOSE");
134 connection.close();
135 } catch (Exception e) {
136 logger.warn("signout failed,", e);
137 }
138 }
139 connection = null;
140 }
141 }
142
148 public boolean isClosed() {
149 return isClosed;
150 }
151
157 private void checkClosed() {
158 if (isClosed) {
159 throw new RuntimeException("The NebulaClient already closed.");
160 }
161 }
162
163 private void initClient() throws AuthFailedException, IOErrorException {
164 // create connection with NebulaGraph Server
165 AuthResult authResult = null;
166 connection = new GrpcConnection();
167 int tryConnectTimes = servers.size();
168 Collections.shuffle(servers);
169 while (tryConnectTimes-- > 0) {
170 try {
171 connection.open(servers.get(tryConnectTimes), builder);
172 authResult = connection.authenticate(builder.userName, builder.authOptions);
173 sessionId = authResult.getSessionId();
174 break;
175 } catch (AuthFailedException e) {
176 logger.error("create NebulaClient failed.", e);
177 throw e;
178 } catch (Exception e) {
179 if (tryConnectTimes == 0) {
180 logger.error("create NebulaClient failed.", e);
181 throw e;
182 }
183 }
184 }
185 }
186
187
188 public ScanNodeResultIterator scanNode(String graphName, String nodeType) {
189 List<Integer> parts = getAllParts();
190 return scanNode(graphName, nodeType, new ArrayList<>(), true, parts, DEFAULT_BATCH_SIZE);
191 }
192
193 public ScanNodeResultIterator scanNode(String graphName,
194 String nodeType,
195 List<String> returnProperties) {
196 List<Integer> parts = getAllParts();
197 boolean allProperties = false;
198 if (returnProperties == null) {
199 allProperties = true;
200 }
201 return scanNode(graphName, nodeType, returnProperties,
202 allProperties, parts, DEFAULT_BATCH_SIZE);
203 }
204
218 public ScanNodeResultIterator scanNode(String graphName,
219 String nodeType,
220 List<String> returnProperties,
221 int batchSize) {
222 List<Integer> parts = getAllParts();
223 boolean allProperties = false;
224 if (returnProperties == null) {
225 allProperties = true;
226 }
227 return scanNode(graphName, nodeType, returnProperties, allProperties, parts, batchSize);
228 }
229
244 public ScanNodeResultIterator scanNode(String graphName,
245 String nodeType,
246 List<String> returnProperties,
247 int part,
248 int batchSize) {
249 return scanNode(graphName, nodeType, returnProperties,
250 Collections.singletonList(part), batchSize);
251 }
252
253
268 public ScanNodeResultIterator scanNode(String graphName,
269 String nodeType,
270 List<String> returnProperties,
271 List<Integer> parts,
272 int batchSize) {
273 boolean allProperties = false;
274 if (returnProperties == null) {
275 allProperties = true;
276 }
277 return scanNode(graphName, nodeType, returnProperties, allProperties, parts, batchSize);
278 }
279
280
296 private ScanNodeResultIterator scanNode(String graphName,
297 String nodeType,
298 List<String> returnProperties,
299 boolean allProperties,
300 List<Integer> parts,
301 int batchSize) {
302 // get node type's all property names
303 List<String> nodeProperties = null;
304 try {
305 nodeProperties = getNodeProperties(graphName, nodeType);
306 } catch (IOErrorException e) {
307 logger.error("get node schema failed.", e);
308 throw new RuntimeException(e);
309 }
310
311 // construct the return property list for scan
312 List<String> propertyList = new ArrayList<>();
313 if (allProperties) {
314 propertyList.addAll(nodeProperties);
315 } else {
316 String primaryKey = nodeProperties.get(0);
317 // put the primary key always on the head of the propertyList for scan
318 propertyList.add(primaryKey);
319 for (String propName : returnProperties) {
320 if (propName.trim().equals(primaryKey)) {
321 continue;
322 }
323 propertyList.add(propName);
324 }
325 }
326
327 return new ScanNodeResultIterator(graphName,
328 nodeType,
329 propertyList,
330 parts,
331 batchSize,
332 builder.scanParallel,
333 servers,
334 builder.userName,
335 builder.authOptions,
336 builder.requestTimeoutMills);
337 }
338
339
340 public ScanEdgeResultIterator scanEdge(String graphName, String edgeType) {
341 List<Integer> parts = getAllParts();
342 return scanEdge(graphName, edgeType, new ArrayList<>(), true, parts, DEFAULT_BATCH_SIZE);
343 }
344
345 public ScanEdgeResultIterator scanEdge(String graphName,
346 String edgeType,
347 List<String> returnProperties) {
348 List<Integer> parts = getAllParts();
349 boolean allProperties = false;
350 if (returnProperties == null) {
351 allProperties = true;
352 }
353 return scanEdge(graphName, edgeType, returnProperties,
354 allProperties, parts, DEFAULT_BATCH_SIZE);
355 }
356
370 public ScanEdgeResultIterator scanEdge(String graphName,
371 String edgeType,
372 List<String> returnProperties,
373 int batchSize) {
374 boolean allProperties = false;
375 if (returnProperties == null) {
376 allProperties = true;
377 }
378 List<Integer> parts = getAllParts();
379 return scanEdge(graphName, edgeType, returnProperties, allProperties,
380 parts, batchSize);
381 }
382
383
397 public ScanEdgeResultIterator scanEdge(String graphName,
398 String edgeType,
399 List<String> returnProperties,
400 int part,
401 int batchSize) {
402 boolean allProperties = false;
403 if (returnProperties == null) {
404 allProperties = true;
405 }
406 return scanEdge(
407 graphName,
408 edgeType,
409 returnProperties,
410 allProperties,
411 Collections.singletonList(part),
412 batchSize);
413 }
414
415
429 public ScanEdgeResultIterator scanEdge(String graphName,
430 String edgeType,
431 List<String> returnProperties,
432 List<Integer> parts,
433 int batchSize) {
434 boolean allProperties = false;
435 if (returnProperties == null) {
436 allProperties = true;
437 }
438 return scanEdge(graphName, edgeType, returnProperties, false, parts, batchSize);
439 }
440
441
456 private ScanEdgeResultIterator scanEdge(String graphName,
457 String edgeType,
458 List<String> returnProperties,
459 boolean allProperties,
460 List<Integer> parts,
461 int batchSize) {
462 // get edge type's all property names
463 List<String> edgeProperties = null;
464 try {
465 edgeProperties = getEdgeProperties(graphName, edgeType);
466 } catch (IOErrorException e) {
467 logger.error("get node schema failed.", e);
468 throw new RuntimeException(e);
469 }
470
471 // construct the return property list for scan
472 List<String> propertyList = new ArrayList<>();
473 if (allProperties) {
474 propertyList.addAll(edgeProperties);
475 } else {
476 propertyList.addAll(returnProperties);
477 }
478 return new ScanEdgeResultIterator(graphName,
479 edgeType,
480 propertyList,
481 parts,
482 batchSize,
483 builder.scanParallel,
484 servers,
485 builder.userName,
486 builder.authOptions,
487 builder.requestTimeoutMills);
488 }
489
490
496 private List<Integer> getAllParts() {
497 String showPartitions = "CALL show_partitions() RETURN *";
498 ResultSet resultSet;
499 try {
500 resultSet = execute(showPartitions);
501 } catch (Exception e) {
502 logger.error("get all partitions error", e);
503 throw new RuntimeException("get all partitions error", e);
504 }
505 if (!resultSet.isSucceeded() || resultSet.isEmpty()) {
506 logger.error("get all partitions failed for {}", resultSet.getErrorMessage());
507 throw new RuntimeException(
508 "get all partitions failed for " + resultSet.getErrorMessage());
509 }
510
511 List<Integer> partitions = new ArrayList<>();
512 while (resultSet.hasNext()) {
513 partitions.add(resultSet.next().get("partition_id").asInt());
514 }
515 if (partitions.contains(0)) {
516 partitions.remove(Integer.valueOf(0));
517 }
518 return partitions;
519 }
520
521
529 private List<String> getNodeProperties(String graphName, String nodeType)
530 throws IOErrorException {
531 String graphType = getGraphType(graphName);
532 String descNodeType = String.format("DESCRIBE NODE TYPE `%s` OF `%s`",
533 GqlUtil.escape(nodeType),
534 GqlUtil.escape(graphType));
535 ResultSet resultSet = execute(descNodeType);
536 if (!resultSet.isSucceeded() || resultSet.isEmpty()) {
537 logger.error(String.format("get description of %s failed for %s", nodeType,
538 resultSet.getErrorMessage()));
539 throw new IllegalArgumentException(String.format("node type %s does not exist in %s",
540 nodeType, graphName));
541 }
542
543 List<String> pks = new ArrayList<>();
544 List<String> propNames = new ArrayList<>();
545 while (resultSet.hasNext()) {
546 ResultSet.Record record = resultSet.next();
547 propNames.add(record.get("property_name").asString());
548 if ("Y".equals(record.get("primary_key").asString())) {
549 pks.add(record.get("property_name").asString());
550 }
551 }
552
553 if (pks.isEmpty()) {
554 logger.error("node type " + nodeType + " has no primary key.");
555 throw new RuntimeException("node type " + nodeType + " has no primary key");
556 }
557
558 // define the property name list, and put the pk on the head of list.
559 List<String> propertyNames = new ArrayList<>(pks);
560 for (String property : propNames) {
561 if (pks.contains(property)) {
562 continue;
563 }
564 propertyNames.add(property);
565 }
566 return propertyNames;
567 }
568
569
577 private List<String> getEdgeProperties(String graphName, String edgeType)
578 throws IOErrorException {
579 String graphType = getGraphType(graphName);
580
581 String descEdgeType = String.format(
582 "CALL describe_graph_type(\"%s\") FILTER type_name=\"%s\" return properties",
583 GqlUtil.escape(graphType), GqlUtil.escape(edgeType));
584 ResultSet resultSet = execute(descEdgeType);
585 if (!resultSet.isSucceeded() || resultSet.isEmpty()) {
586 logger.error(String.format("get description of %s failed for %s", edgeType,
587 resultSet.getErrorMessage()));
588 throw new IllegalArgumentException(String.format("edge type %s does not exist in %s",
589 edgeType, graphName));
590 }
591
592 List<ValueWrapper> properties = resultSet.next().get("properties").asList();
593 return properties.stream().map(ValueWrapper::asString).collect(Collectors.toList());
594 }
595
596
603 private String getGraphType(String graphName) throws IOErrorException {
604 ResultSet resultSet = execute(String.format("DESCRIBE GRAPH `%s`",
605 GqlUtil.escape(graphName)));
606 String graphType;
607 if (resultSet.isSucceeded() && !resultSet.isEmpty()) {
608 graphType = resultSet.next().values().get(1).asString();
609 } else {
610 throw new IllegalArgumentException("graphName " + graphName + " does not exist.");
611 }
612 return graphType;
613 }
614
615 public static class Builder {
616 protected final List<HostAddress> address;
617 protected final String userName;
618 protected final String password;
619 protected Map<String, Object> authOptions = new HashMap<>();
620 protected long connectTimeoutMills = DEFAULT_CONNECT_TIMEOUT_MS;
621 // ms timeout for rpc request, the value must be larger than 0, smaller than 100000001000L
622 protected long requestTimeoutMills = DEFAULT_REQUEST_TIMEOUT_MS;
623 protected int scanParallel = DEFAULT_SCAN_PARALLEL;
624 protected boolean enableTls = DEFAULT_ENABLE_TLS;
625 protected String tlsCa;
626 protected String tlsCert;
627 protected String tlsKey;
628 protected boolean tlsPeerNameVerify = DEFAULT_TLS_PEER_NAME_VERIFY;
629 protected String tlsPeerName;
630
638 public Builder(String addresses, String userName, String password) {
639 try {
640 this.address = AddressUtil.validateAddress(addresses);
641 } catch (UnknownHostException e) {
642 throw new RuntimeException(e);
643 }
644 this.userName = userName;
645 this.password = password;
646 }
647
654 public Builder withAuthOptions(Map<String, Object> authOptions) {
655 if (authOptions != null) {
656 this.authOptions.putAll(authOptions);
657 }
658 return this;
659 }
660
668 public Builder withConnectTimeoutMills(long connectTimeoutMills) {
669 if (connectTimeoutMills <= 0 || connectTimeoutMills > DEFAULT_MAX_TIMEOUT_MS) {
670 this.connectTimeoutMills = DEFAULT_MAX_TIMEOUT_MS;
671 } else {
672 this.connectTimeoutMills = connectTimeoutMills;
673 }
674 return this;
675 }
676
684 public Builder withRequestTimeoutMills(long requestTimeoutMills) {
685 if (requestTimeoutMills < 0 || requestTimeoutMills > DEFAULT_MAX_TIMEOUT_MS) {
686 this.requestTimeoutMills = DEFAULT_MAX_TIMEOUT_MS;
687 } else {
688 this.requestTimeoutMills = requestTimeoutMills;
689 }
690 return this;
691 }
692
699 public Builder withScanParallel(int scanParallel) {
700 this.scanParallel = scanParallel;
701 return this;
702 }
703
710 public Builder withEnableTls(boolean enableTls) {
711 this.enableTls = enableTls;
712 return this;
713 }
714
721 public Builder withTlsCa(String ca) {
722 this.tlsCa = ca;
723 return this;
724 }
725
733 public Builder withTlsCert(String cert, String key) {
734 this.tlsCert = cert;
735 this.tlsKey = key;
736 return this;
737 }
738
739
747 public Builder withTlsPeerName(String tlsPeerName) {
748 this.tlsPeerName = tlsPeerName;
749 return this;
750 }
751
752 public void check() {
753 if (address == null) {
754 throw new IllegalArgumentException("Graph addresses cannot be empty.");
755 }
756
757 if (enableTls && tlsCa == null) {
758 throw new IllegalArgumentException("TLS is enable, tlsCa cannot be empty.");
759 }
760 if (enableTls && tlsPeerNameVerify && tlsPeerName == null) {
761 throw new IllegalArgumentException(
762 "TLS is enable, tlsPeerName cannot be empty.");
763 }
764 }
765
771 public NebulaClient build() throws AuthFailedException, IOErrorException {
772 check();
773 if (password != null) {
774 authOptions.put("password", password);
775 }
776 return new NebulaClient(this);
777 }
778
779 }
780}
Client to connect to NebulaGraph and send request to NebulaGraph.
synchronized void close()
release and close the connection with NebulaGraph server.
ScanEdgeResultIterator scanEdge(String graphName, String edgeType, List< String > returnProperties, List< Integer > parts, int batchSize)
scan the data of specific edgeType
ScanNodeResultIterator scanNode(String graphName, String nodeType, List< String > returnProperties, List< Integer > parts, int batchSize)
scan the data of specific nodeType the result will contain primary key and specific return properties...
ScanEdgeResultIterator scanEdge(String graphName, String edgeType, List< String > returnProperties, int batchSize)
scan the data of specific edgeType the result will contain src node's primary key,...
synchronized boolean ping(long timeoutMs)
ping the NebulaGraph server
ScanNodeResultIterator scanNode(String graphName, String nodeType, List< String > returnProperties, int part, int batchSize)
scan the data of specific nodeType the result will contain primary key and specific return properties...
String getHost()
get the NebulaGraph host address that this client is connecting to.
ScanEdgeResultIterator scanEdge(String graphName, String edgeType, List< String > returnProperties, int part, int batchSize)
scan the data of specific edgeType
ScanNodeResultIterator scanNode(String graphName, String nodeType, List< String > returnProperties, int batchSize)
scan the data of specific nodeType.
long getSessionId()
get the SessionId of the Client