Offline CSV Import of Billions of Data | Best Practice Using NebulaGraph Spark Connector

Based on the actual demand and subsequent scalability, the NebulaGraph database is used for technical selection. The batch import performance in the actual business scenarios of NebulaGraph needs to be first verified.

This article shares the best practice of using NebulaGraph Spark Connector by performing import jobs via Spark on Yarn distributed tasks and with CSV files on HDFS as the data source.

About NebulaGraph Spark Connector

The following figure shows the use cases and advantages of NebulaGraph Spark Connector. For more details, see NebulaGraph Spark Connector.

Environment Information

  • Hardware environment
Name Value Recommend
Local Disk (SSD) 2 T At least 2 T
CPU 16 C * 4 128 C
Memory 128 GB 128 GB
  • Software environment
Name 版本号
NebulaGraph 3.0.0
NebulaGraph Spark Connector 3.0.0
Hadoop 2.7.2U17-10
Spark 2.4.5U5
  • Data size
Name Value
Data size 200 GB
Vertex 930 million
Edge 970 million

Deployment Plan

There are roughly three steps for the deployment.

  1. Download the RPM package of NebulaGraph and install it.
  2. Modify configuration files in batch.
  3. Start the cluster service.

The above-mentioned three steps are performed with the root user. For non-root users, use sudo to execute the commands.

Download and Install the NebulaGraph RPM Package

Execute the following commands:

wget https://os-cdn.nebula-graph.com.cn/package/3.0.0/nebula-graph-3.0.0.el7.x86_64.rpm
wget https://oss-cdn.nebula-graph.com.cn/package/3.0.0/nebula-graph-3.0.0.el7.x86_64.rpm.sha256sum.txt
rpm -ivh nebula-graph-3.0.0.el7.x86_64.rpm

Note: The default installation path is /usr/local/nebula. Make sure the memory resources are sufficient.

Modify Configuration Files in Batch

sed -i 's?--meta_server_addrs=,172.1x.x.176:9559,172.1x.1x.149:9559?g' *.conf
sed -i 's?--local_ip=' *.conf
sed -i 's?--meta_server_addrs=,172.1x.x.176:9559,172.1x.1x.149:9559?g' *.conf
sed -i 's?--local_ip=' *.conf
sed -i 's?--meta_server_addrs=,172.1x.x.176:9559,172.1x.1x.149:9559?g' *.conf
sed -i 's?--local_ip=' *.conf

Note: The IP addresses in the above commands are internal IP addresses, used for communication between nodes.

Start the NebulaGraph Cluster Service

/usr/local/nebula/scripts/nebula.service start all

The above command is to start the cluster service, the following is to check the status of the cluster service.

ps aux|grep nebula

The returned result shows 3 processes for each service.

/usr/local/nebula/bin/nebula-metad --flagfile /usr/local/nebula/etc/nebula-metad.conf
/usr/local/nebula/bin/nebula-graphd --flagfile /usr/local/nebula/etc/nebula-graphd.conf
/usr/local/nebula/bin/nebula-storaged --flagfile /usr/local/nebula/etc/nebula-storaged.conf

Note: If the returned result shows less than 3 processes, run /usr/local/nebula/scripts/nebula.service start all several times or run /usr/local/nebula/scripts/nebula.service restart all to restart the service.

Visualization Service

The NebulaGraph Studio is used as the visualize the NebulaGraph cluster service. Visit http://n01v:7001 to log into the NebulaGraph Studio. (This is my network environment which is not accessible from the Internet.)

  • Login: 10.x.x.1 (The IP of any node):9669
  • Username/Password: root/nebula

Refer to Use nGQL (CRUD) for common-used nGQL statements.

Use NebulaGraph

Add the Storage nodes to the cluster.

ADD HOSTS 172.x.x.121:9779,,;

List all nodes and check whether the status of the cluster is ONLINE by runningSHOW HOSTSorSHOW HOSTS META`. Create a graph space, equivalent to a database in relational databases.

CREATE SPACE mylove (partition_num = 15, replica_factor = 3, vid_type = FIXED_STRING(256));//The number of partitions is recommended to be 5 times the number of nodes; the number of replicas is an odd number, generally set to 3; if the data type of vids is string, make the length as long as a reasonable value, otherwise, it takes up too much disk space.

Create a Tag, equivalent to an entity.:

CREATE TAG entity (name string NULL, version string NULL);

Create an edge, equivalent to a relationship.

CREATE EDGE relation (name string NULL);  

Add limit when querying data, otherwise, the query may take a long time.

match (v) return v limit 100;

Read and Import CSV Files with Spark Connector


Code example of NebulaSparkWriterExample

import com.facebook.thrift.protocol.TCompactProtocol
import com.vesoft.nebula.connector.{
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
object NebulaSparkWriter {
  private val LOG = LoggerFactory.getLogger(this.getClass)
  var ip = ""
  def main(args: Array[String]): Unit = {
    val part = args(0)
    ip = args(1)
    val sparkConf = new SparkConf
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val spark = SparkSession
    if("1".equalsIgnoreCase(part)) writeVertex(spark)
    if("2".equalsIgnoreCase(part)) writeEdge(spark)
  def getNebulaConnectionConfig(): NebulaConnectionConfig = {
    val config =
        .withMetaAddress(ip + ":9559")
        .withGraphAddress(ip + ":9669")
  def writeVertex(spark: SparkSession): Unit = {
    LOG.info("start to write nebula vertices: 1 entity")
    val df = spark.read.option("sep", "\t").csv("/home/2022/project/origin_file/csv/tag/entity/").toDF("id", "name", "version")
    val config = getNebulaConnectionConfig()
    val nebulaWriteVertexConfig: WriteNebulaVertexConfig = WriteNebulaVertexConfig
    df.coalesce(1400).write.nebula(config, nebulaWriteVertexConfig).writeVertices()
  def writeEdge(spark: SparkSession): Unit = {
    LOG.info("start to write nebula edges: 2 entityRel")
    val df = spark.read.option("sep", "\t").csv("/home/2022/project/origin_file/csv/out/rel/relation/").toDF("src", "dst", "name")
    val config = getNebulaConnectionConfig()
    val nebulaWriteEdgeConfig: WriteNebulaEdgeConfig = WriteNebulaEdgeConfig
    df.coalesce(1400).write.nebula(config, nebulaWriteEdgeConfig).writeEdges()

Details on NebulaSparkWriterExample

The following are the details of the functions.

  • spark.sparkContext.setLogLevel("WARN"): Sets the log print level and prevents the INFO log.

  • withTimeout(Integer.MAX_VALUE): Sets the connection timeout. The timeout can be set as large as possible. The default value is 1 minute. When the number of timeouts is greater than that of retries, the Spark task fails.

  • option("sep", "\t"): Specifies the separator of CSV files, otherwise, it defaults to 1 column.

  • toDF("src", "dst", "name"): The schema of the dataset, i.e., converting Dataset<Row> to DataFrame, otherwise, the VidField cannot be specified.

  • withVidField("id"): This function only works when the column name is set, so a schema must be specified.

  • withVidAsProp(false): When the ID is set as the VID by default, data will not be written repeatedly as a property that will not take up disk space.

  • withSrcIdField("src"): Sets the IdField of the source vertex.

  • withDstIdField("dst"): Sets the IdField of the destination vertex.

  • withSrcAsProperty(false): Saves disk space.

  • withDstAsProperty(false): Saves disk space.

  • withBatch(1000): The number of rows of data imported in batch. WriteMode.UPDATE is <=512 by default. The value of WriteMode.INSERT can be larger (Gigabit NIC/Bandwidth 5Gbps /Local SSD = 1500)

  • coalesce(1500): Adjustable according to the number of concurrent tasks. Too much data on a single partition can easily cause executor OOM exceptions.

Submit Tasks to the Spark Cluster

nohup spark-submit  --master yarn --deploy-mode client --class com.xxx.nebula.connector.NebulaSparkWriter --conf spark.dynamicAllocation.enabled=false --conf spark.executor.memoryOverhead=10g  --conf spark.blacklist.enabled=false --conf spark.default.parallelism=1000 --driver-memory 10G --executor-memory 12G --executor-cores 4 --num-executors 180 ./example-3.0-SNAPSHOT.jar >  run-csv-nebula.log 2>&1 &

Monitoring with iotop

The iotop command can be used to monitor the disk IO of the Spark cluster.

Total DISK READ :      26.61 K/s | Total DISK WRITE :     383.77 M/s
Actual DISK READ:      26.61 K/s | Actual DISK WRITE:     431.75 M/s

Monitoring with top

The top command can be used to monitor the CPU and memory of the Spark cluster.

top - 16:03:01 up 8 days, 28 min,  1 user,  load average: 6.16, 6.53, 4.58
Tasks: 205 total,   1 running, 204 sleeping,   0 stopped,   0 zombie
%Cpu(s): 28.3 us, 14.2 sy,  0.0 ni, 56.0 id,  0.6 wa,  0.0 hi,  0.4 si,  0.5 st
KiB Mem : 13186284+total,  1135004 free, 31321240 used, 99406592 buff/cache
KiB Swap:        0 total,        0 free,        0 used. 99641296 avail Mem 
  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND                                                                         
27979 root      20   0 39.071g 0.026t   9936 S 564.6 20.8  83:22.03 nebula-storaged                                                                 
27920 root      20   0 2187476 804036   7672 S 128.2  0.6  17:13.75 nebula-graphd                                                                   
27875 root      20   0 6484644 1.990g   8588 S  58.5  1.6  14:14.22 nebula-metad     

Other Resource Monitoring

Service Optimization

Configuration Optimization of nebula-storaged.conf

Here, I modified the value of parameters in the nebula-storaged.conf file.

# Default reserved bytes for a batch operation.
# Default block cache size used in BlockBasedTable.
# The unit is MB. It is usually set to one third of the memory value. The memory is 128 GB.
############## rocksdb Options ##############
# In JSON, the name and value of each option of rocksdb DBOptions is a string. Such as “option_name”:“option_value”. Use a comma to separate each option.
# In JSON, the name and value of each option of rocksdb ColumnFamilyOptions is a string. Such as “option_name”:“option_value”. Use a comma to separate each option.
# In JSON, the name and value of each option of rocksdb BlockBasedTableOptions is a string. Such as “option_name”:“option_value”. Use a comma to separate each option.
# The number of processors per request.
# The interval time between clusters.
# The max value for batch import.
# Reduces memory usage when the value is true.
# The data is indirectly filtered at the bottom storage layer to prevent encountering the super node in the production environment.

Optimizations of the Linux System

ulimit -c unlimited
ulimit -n 130000
sysctl -w net.ipv4.tcp_slow_start_after_idle=0
sysctl -w net.core.somaxconn=2048
sysctl -w net.ipv4.tcp_max_syn_backlog=2048
sysctl -w net.core.netdev_max_backlog=3000
sysctl -w kernel.core_uses_pid=1

Verify Import Results

  • The entity insertion rate is approximately 27,837 items/s (for this import performance calculation only).

  • The relationship insertion rate is about 26,276 items/s (for this import performance calculation only).

  • The import performance will be better if the server is better configured. In addition, bandwidth, cross-datacenter or not, disk IO, and even network fluctuations are also factors that affect performance.

Performance Tests

  • Query for the specified entities based on properties.
  MATCH (v:entity) WHERE v.entity.name == 'Lifespan' RETURN v;

It takes 0.002558 (s).

  • One-hop queries.
  MATCH (v1:entity)-[e:propertiesRel]->(v2:attribute) WHERE id(v1) == '70da43c5e46f56c634547c7aded3639aa8a1565975303218e2a92af677a7ee3a' RETURN v2 limit 100;  

It takes 0.003571 (s)

  • Two-hop queries.
  MATCH p=(v1:entity)-[e:propertiesRel*1..2]->(v2) WHERE id(v1) == '70da43c5e46f56c634547c7aded3639aa8a1565975303218e2a92af677a7ee3a' RETURN p;

It takes 0.005143 (s)

  • Query for all the properties of an edge.
  FETCH PROP ON propertiesRel '70da43c5e46f56c634547c7aded3639aa8a1565975303218e2a92af677a7ee3a' -> '0000002d2e88d7ba6659db83893dedf3b8678f3f80de4ffe3f8683694b63a256' YIELD properties(edge); 

It takes 0.001304 (s)

  match p=(v:entity{name:"张三"})-[e:entityRel|propertiesRel*1]->(v2) return p;

å It takes 0.02986 (s)

  match p=(v:entity{name:"张三"})-[e:entityRel|propertiesRel*2]->(v2) return p; 

It takes 0.07937 (s)

  match p=(v:entity{name:"张三"})-[e:entityRel|propertiesRel*3]->(v2) return p; 

It takes 0.269 (s)

  match p=(v:entity{name:"张三"})-[e:entityRel|propertiesRel*4]->(v2) return p;

It takes 3.524859 (s)

```nebula match p=(v:entity{name:"张三"})-[e:entityRel|propertiesRel*1..2]->(v2) return p;

  It takes 0.072367 (s)

nebula match p=(v:entity{name:"张三"})-[e:entityRel|propertiesRel*1..3]->(v2) return p;

  It takes 0.279011 (s)

nebula match p=(v:entity{name:"张三"})-[e:entityRel|propertiesRel*1..4]->(v2) return p; It takes 3.728018 (s)

 - Query for the properties of vertices and the edge in the shortest path in bilateral directions between `A_vid` and `B_vid`.

nebula FIND SHORTEST PATH WITH PROP FROM "70da43c5e46f56c634547c7aded3639aa8a1565975303218e2a92af677a7ee3a" TO "0000002d2e88d7ba6659db83893dedf3b8678f3f80de4ffe3f8683694b63a256" OVER * BIDIRECT YIELD path AS p;

  It takes 0.003096 (s)

nebula FIND ALL PATH FROM "70da43c5e46f56c634547c7aded3639aa8a1565975303218e2a92af677a7ee3a" TO "0000002d2e88d7ba6659db83893dedf3b8678f3f80de4ffe3f8683694b63a256" OVER * WHERE propertiesRel.name is not EMPTY or propertiesRel.name >=0 YIELD path AS p;

  It takes 0.003656 (s)

# Problems Encountered

## Guava version conflict

Caused by: java.lang.NoSuchMethodError: com.google.common.base.Stopwatch.createStarted()Lcom/google/common/base/Stopwatch;

After troubleshooting, I found that one of the dependent modules used Guava version 22.0, while the Spark cluster came with 14.0, resulting in a conflict, and it did not work properly. For tasks running on Spark clusters, Spark loads Guava packages with higher priority than its packages.

The package we depend on uses methods that are newer in Guava version 22.0 and not available in version 14.0. Without being able to modify each other's code, the following options are available:

1. The Spark cluster package can be upgraded, which is risky and can cause unknown problems.

2. Rename your Guava package using the Maven plugin.

The second way is used here, using the Maven plugin shade (link: https://maven.apache.org/plugins/maven-shade-plugin/) to rename the package to solve the problem.

java org.apache.maven.plugins maven-shade-plugin 3.2.4 package shade com.google.common my_guava.common : META-INF/maven/* META-INF/.SF META-INF/.DSA META-INF/.RSA

## Spark Blacklist Mechanism

Blacklisting behavior can be configured via spark.blacklist.*. `` The default value ofspark.blacklist.enabledisfalse. If the value is set totrue, Spark will no longer schedule tasks to the blacklisted executors. The blacklisting algorithm can be further configured by settingspark.blacklist`.

*This article was written by Enqiu Jia and first published on the WeChat Official Account *只要5分钟不要996*. This article is translated and published with permission.