Neo4j Python 4 Driver – Example

While Neo4j 4.0 was released in December 2019, the Neo4j 4.0 python driver wasn’t ready to be released at that time. In the last week or so, Neo4j released a 4.0.0a4 version of the driver that allows for multi-database support. The driver documentation has been updated to show how to use the latest driver.

Someone asked recently for some sample code on how to use the driver. I created a sample gist showing SSL certificate support, reading a single result, reading multiple results and running create statements in a transaction block.

Give the driver a test drive and let us know if you run into any issues.

Kafka – Neo4j – SSL Config

This blogpost provides guidance to configure SSL security between Kafka and Neo4j. This will provide data encryption between Kafka and Neo4j. This does not address ACL confguration inside of KAFKA.

Self Signed Certificates

This section came from https://medium.com/talking-tech-all-around/how-to-enable-and-verify-client-authentication-in-kafka-21e936e670e8
Make sure that you have truststore and keystore JKSs for each server.+ In case you want a self signed certificate, you can use the following commands:

mkdir security
cd security

export PASSWORD=password
keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkey
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client1.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:$PASSWORD
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed
keytool -keystore kafka.client1.keystore.jks -alias localhost -validity 365 -genkey
keytool -keystore kafka.client1.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:$PASSWORD
keytool -keystore kafka.client1.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client1.keystore.jks -alias localhost -import -file cert-signed

Once the keystores are created, you have to move the kafka.client1.keystore.jks and kafka.client1.truststore.jks to your neo4j server.

Note: This article discusses addressing this error (Caused by: java.security.cert.CertificateException: No subject alternative names present) that may appear when querying the topic. https://geekflare.com/san-ssl-certificate/ 

Kafka Configuration

Connect to your Kafka server and modify the config/server.properties file.
This configuration worked for me but I have seen other configurations without the EXTERNAL and INTERNAL settings.
This configuration is for Kafka on AWS but should work for other configurations.

listeners=EXTERNAL://0.0.0.0:9092,INTERNAL://0.0.0.0:19092,CLIENT://0.0.0.0:9093,SSL://0.0.0.0:9094
listener.security.protocol.map=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,SSL:SSL
advertised.listeners=EXTERNAL://aws_public_ip:9092,INTERNAL://aws_internal_ip:19092,CLIENT://aws_public_ip:9093,SSL://aws_public_ip:9094
inter.broker.listener.name=INTERNAL

ssl.keystore.location=/home/kafka/security/kafka.server.keystore.jks
ssl.keystore.password=neo4jpassword
ssl.truststore.location=/home/kafka/security/kafka.server.truststore.jks
ssl.truststore.password=neo4jpassword
ssl.key.password=neo4jpassword
ssl.enabled.protocols=TLSv1.2,TLSv1.1

ssl.endpoint.identification.algorithm=HTTPS
ssl.client.auth=required

Neo4j Configuration

The following is required for a Neo4j configuration. In this case, we are connecting to the public AWS IP address. The keystore and truststore locations point to the files that you created earlier in the steps.

Note: The passwords are stored in plaintext so limit access to this neo4j.conf file.

kafka.zookeeper.connect=xxx.xxx.xxx.xxx:2181
kafka.bootstrap.servers=xxx.xxx.xxx.xxx:9094
streams.sink.enabled=false
streams.sink.polling.interval=1000

streams.source.topic.nodes.neoTest=Person{*}

kafka.auto.offset.reset=earliest
kafka.group.id=neo4j
streams.sink.dlq=neo4j-dlq

kafka.acks=all
kafka.num.partitions=1
kafka.retries=2
kafka.batch.size=16384
kafka.buffer.memory=33554432

kafka.security.protocol=SSL
kafka.ssl.truststore.location=/home/ubuntu/security/kafka.client1.truststore.jks
kafka.ssl.truststore.password=neo4jpassword
kafka.ssl.keystore.location=/home/ubuntu/security/kafka.client1.keystore.jks
kafka.ssl.keystore.password=neo4jpassword
kafka.ssl.key.password=neo4jpassword
kafka.ssl.endpoint.identification.algorithm=HTTPS

dbms.security.procedures.whitelist=apoc.*
dbms.security.procedures.unrestricted=apoc.*
dbms.jvm.additional=-Djavax.net.debug=ssl:handshake

This line dbms.jvm.additional=-Djavax.net.debug=ssl:handshake is optional but does help for debugging SSL issues.

Testing

After starting Kafka and Neo4j, you can test by creating a Person node in Neo4j and then query the topic as follows: 

 ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic neoTest --from-beginning 

If you want to test using SSL, you would do the following:

1. Create a client-ssl.properties file consisting of:
security.protocol=SSL
ssl.truststore.location=/home/kafka/security/kafka.client.truststore.jks
ssl.truststore.password=neo4jpassword
ssl.endpoint.identification.algorithm=

2. Query the topic:


./bin/kafka-console-consumer.sh –bootstrap-server 18.217.67.191:9094 –topic neoTest –consumer.config /home/kafka/client-ssl.properties –from-beginning

Spark to Neo4j

This blog post is mostly for my benefit and for the ability to go back and remember work that I have done. With that being said, this blog post talks about the Neo4j Spark Connector that allows a user to write from a Spark Data Frame to a Neo4j database. I will show the configuration settings within the Databricks Spark cluster, build some DataFrames within the Spark notebook and then write nodes to Neo4j and merge nodes to Neo4j.

For this blog post, I am using the community Databricks site to run a simple Spark cluster.

Configuration:

For the cluster, I chose the Databricks 5.4 Runtime version which includes Apache Spark 2.4.3 and Scala 2.11. There are some Spark configuration options that configure the Neo4j user, Neo4j password and the Neo4j url for the bolt protocol.

Under the Libraries, I loaded the Neo4j Spark Connector library.

Once the configurations were updated and the library added, we can restart the cluster.

As a side note, the Notebook capability is a great feature.

Creating Nodes:

First I loaded up the Chicago crime data into a dataframe. Second was to convert the dataframe into a table. From the table, we’ll run some SQL and load the results into Neo4j. These steps are shown below.

Loading CSV into DataFrame
Creating a Table

This next screen shows how we can get a spark context and connect to Neo4j. Once we have the context, we can execute several Cypher queries. For our example, we are going to delete existing data and creating a constraint and an index.

Connecting to Neo4j and Running Queries

In this step, we can select data from the dataframe and load that data into Neo4j using the Neo4jDataFrame.createNodes option.

Loading Crime Nodes into Neo4j
Merging Crime Beat Nodes
Loading Data into Neo4j Graph Algorithm
Executing a Graph Algorithm

As you see, we can move data from Spark to Neo4j in a pretty seamless manner. The entire Spark notebook can be found here.

In the next post, we will connect Spark to Kafka and then allow Neo4j to read from the Kafka topic.

Neo4j 4.0 Docker

With the upcoming Neo4j 4.0 release, it was time to revisit deploying a Neo4j causal cluster via Docker. Our fantastic parter, GraphAware published a quickstart for deploying a cluster in Docker.

Docker is a tool designed to make it easy to create, deploy, and run applications by using containers. Containers allow developers to package an application with all of the components it needs and distribute it as an atomic, universal unit that can run on any host platform.

This docker-compose.yml file will start a three-core causal cluster.

  1. Download the docker-compose.yml
  2. Open a command shell in the same directory and execute:

docker-compose up

That’s it! After allowing each instance to come to life and to discover each other, the cluster is up and running.

Note: There are still some warnings that show up in the logs but these are items that are being worked on.

Neo4j 4.0 changes:

Some things that have changed and require your attention are using the advertised_address instead of listen_address for the clustering protocols.

  • NEO4J_causal__clustering_discovery__advertised__address=core2:5000
  • NEO4J_causal__clustering_transaction__advertised__address=core2:6000
  • NEO4J_causal__clustering_raft__advertised__address=core2:7000

When finished, the cluster can be shut down by opening a shell in the same directory as docker-compose.yml and executing:

docker-compose down

Neo4j – Kafka – MySQL: Configuration – Part 2

In Part 1, we configured Neo4j, Kafka and MySQL to talk using the Neo4j Kafka plugin and Maxwell’s Daemon. In part 2, I will show how data can be added into MySQL and then added/modified/deleted in Neo4j through the Kafka connector.

For our dataset, I chose the Musicbrainz dataset. This dataset has several tables and has a good amount of data to test with. For this test, I am only going to use the Label and the Artists tables but you could easily add more tables and more data.

For the Label table, here is the MySQL schema.

CREATE TABLE `label` (
  `id` bigint(20) DEFAULT NULL,
  `gid` varchar(100) DEFAULT NULL,
  `name` varchar(200) DEFAULT NULL,
  `sort_name` varchar(200) DEFAULT NULL,
  `type` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

When we start Maxwell’s Daemon, it automatically creates two topics on our Kafka machine. One is musicbrainz_musicbrainz_artist and the other is musicbrainz_musicbrainz_label.

When we do a CRUD operation on the MySQL table, the maxwell-daemon will write the operation type to the Kafka topic. For example, here is what an insert into the Artist table looks like:

{"database":"musicbrainz","table":"artist","type":"insert","ts":1563192678,"xid":13695,"xoffset":87,"data":{"gid":"174442ec-e2ac-451a-a9d5-9a0669fa2edd","name":"Goldcard","sort_name":"Goldcard","id":50419}}
{"database":"musicbrainz","table":"artist","type":"insert","ts":1563192678,"xid":13695,"xoffset":88,"data":{"gid":"2b99cd8e-55de-4a42-9cb8-6489f3195a4b","name":"Banda Black Rio","sort_name":"Banda Black Rio","id":106851}}
{"database":"musicbrainz","table":"artist","type":"insert","ts":1563192678,"xid":13695,"xoffset":89,"data":{"gid":"38927bad-687f-4189-8dcf-edf1b8f716b4","name":"Kauri Kallas","sort_name":"Kallas, Kauri","id":883445}}

The Neo4j cypher statement in our neo4j.conf file will read from that topic, determine the operation (insert, update or delete) and modify data in Neo4j appropriately.

streams.sink.topic.cypher.musicbrainz_musicbrainz_artist=FOREACH(ignoreMe IN CASE WHEN event.type='insert' THEN [1] ELSE [] END | MERGE (u:Artist{gid:event.data.gid}) on match set u.id = event.data.id, u.name=event.data.name, u.sort_name=event.data.sort_name on create set u.id = event.data.id, u.name=event.data.name, u.sort_name=event.data.sort_name) FOREACH(ignoreMe IN CASE WHEN event.type='delete' THEN [1] ELSE [] END | MERGE (u:Artist{gid:event.data.gid})  detach delete u) FOREACH(ignoreMe IN CASE WHEN event.type='update' THEN [1] ELSE [] END | MERGE (u:Artist{gid:event.data.gid})  set u.id = event.data.id, u.name=event.data.name, u.sort_name=event.data.sort_name)

To show how this works, we will remove a Label and then re-add that same label. In our Neo4j database, we already have 163K labels. We will delete the XTOY label from MySQL and watch the delete get placed on the Kafka queue and then removed from Neo4j.

In Neo4j, here is the XTOY label.

In MySQL, we are going to run:
delete from label where name=’XTOY’

In our Kafka musicbrainz_musicbrainz_label topic, we see a delete statement coming from MySQL:

{"database":"musicbrainz","table":"label","type":"delete","ts":1563197724,"xid":22983,"commit":true,"data":{"id":27894,"gid":"e8c1f93b-f518-43f2-b9be-e00e31a5e92d","name":"XTOY","sort_name":"2000","type":-1}}

Neo4j polls the topic, evaluates the type of action and acts appropriately. Int his case, it should delete the XTOY label. Let’s look at Neo4j now and see if the XTOY label has been removed:

We see that it has been removed. Now, let’s reinsert the record into MySQL and see if Neo4j picks up the insert.

INSERT INTO label(id,gid,name, sort_name, type) values(27894,'e8c1f93b-f518-43f2-b9be-e00e31a5e92d','XTOY','XTOY',-1)

{"database":"musicbrainz","table":"label","type":"insert","ts":1563197966,"xid":23309,"commit":true,"data":{"id":27894,"gid":"e8c1f93b-f518-43f2-b9be-e00e31a5e92d","name":"XTOY","sort_name":"XTOY","type":-1}}

Checking Neo4j once more we see the node has been added in.

The data is automatically replicated across the Neo4j cluster so we don’t have to worry about that aspect.

In summary, it is straight-forward to sync database changes from MySQL to Neo4j through Kafka.

Neo4j – Kafka – MySQL: Configuration – Part 1

With the new Neo4j Kafka streams now available, there has been a few articles such as A New Neo4j Integration with Apache Kafka and How to leverage Neo4j Streams and build a just-in-time data warehouse and Processing Neo4j Transaction Events with KSQL and Kafka Streams and finally How to embrace event-driven graph analytics using Neo4j and Apache Kafka. In this post, I will discuss configuring a Neo4j cluster that will use the Neo4j Kafka Streams to connect to a Kafka server. I will also talk about configuring Maxwell’s Daemon to stream data from MySQL to Kafka and then on to Neo4j.

The new Neo4j Kafka streams library is a Neo4j plugin that you can add to each of your Neo4j instances. It enables three types of Apache Kafka mechanisms:

  • Producer: based on the topics set up in the Neo4j configuration file. Outputs to said topics will happen when specified node or relationship types change
  • Consumer: based on the topics set up in the Neo4j configuration file. When events for said topics are picked up, the specified Cypher query for each topic will be executed
  • Procedure: a direct call in Cypher to publish a given payload to a specified topic

You can get a more detailed overview of how each of these might look like here.

Kafka

For Kafka, I configured an AWS EC2 instance to serve as my Kafka machine. For the setup, I followed the instructions from the quick start guide up until step 2. Before we get Kafka up and running, we will need to set up the consumer elements in the Neo4j configuration files.

If you are using the Dead Letter Queue functionality in the Neo4j Kafka connector, you will have to create that topic. For the MySQL sync topics, the Maxwell Daemon will automatically create those based on the settings in the config.properties file.

MySQL

For MySQL,  I set up a simple MySQL server on Ubuntu. A couple of things to note.

  • I had to modify the /etc/mysql/my.cnf file and add:
[mysqld]
server_id=21
log-bin=master
binlog_format=row
  • I had to create a Maxell user.
CREATE USER 'maxwell'@'%' IDENTIFIED BY 'YourStrongPassword';
	GRANT ALL ON maxwell.* TO 'maxwell'@'%';
	GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

Maxwell’s Daemon

For the sync between MySQL and Kafka, I used Maxwell’s daemon, an application that reads MySQL binlogs and writes row updates as JSON to Kafka, Kinesis, or other streaming platforms. Maxwell has low operational overhead, requiring nothing but mysql and a place to write to. Its common use cases include ETL, cache building/expiring, metrics collection, search indexing and inter-service communication. Maxwell gives you some of the benefits of event sourcing without having to re-architect your entire platform.

I downloaded Maxwell’s Daemon and installed it on the MySQL server. I then made the following configuration changes in the config.properties file.


producer=kafka
kafka.bootstrap.servers=xxx.xxx.xxx.xxx:9092
# mysql login info
host=xxx.xxx.xxx.xxx
user=maxwell
password=YourStrongMaxwellPassword
kafka_topic=blogpost_%{database}_%{table}

By configuring the kafka_topic to be linked to the database and the table, Maxwell automatically creates a topic for each table in the database.

Neo4j Cluster

Configuring a Neo4j cluster is covered in the Neo4j Operations Manual

We will use the Neo4j Streams plugin. As the instructions say, we download the latest release jar from latest and copy it into $NEO4J_HOME/plugins on each of the Neo4j cluster members. Then we will need to do some configuration.

In the Neo4j.conf file, we will need to configure the Neo4j Kafka plugin. This configuration will be the same for all Core servers in the Neo4j cluster. The neo4j.conf configuration is as follows:


### Neo4j.conf

kafka.zookeeper.connect=xxx.xxx.xxx.xxx:2181
kafka.bootstrap.servers=xxx.xxx.xxx.xxx:9092
streams.sink.enabled=true
streams.sink.polling.interval=1000

streams.sink.topic.cypher.Neo4jPersonTest=MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)

streams.sink.topic.cypher.musicbrainz_musicbrainz_artist=FOREACH(ignoreMe IN CASE WHEN event.type='insert' THEN [1] ELSE [] END | MERGE (u:Artist{gid:event.data.gid}) on match set u.id = event.data.id, u.name=event.data.name, u.sort_name=event.data.sort_name on create set u.id = event.data.id, u.name=event.data.name, u.sort_name=event.data.sort_name) FOREACH(ignoreMe IN CASE WHEN event.type='delete' THEN [1] ELSE [] END | MERGE (u:Artist{gid:event.data.gid})  detach delete u) FOREACH(ignoreMe IN CASE WHEN event.type='update' THEN [1] ELSE [] END | MERGE (u:Artist{gid:event.data.gid})  set u.id = event.data.id, u.name=event.data.name, u.sort_name=event.data.sort_name)

streams.sink.topic.cypher.musicbrainz_musicbrainz_label=FOREACH(ignoreMe IN CASE WHEN event.type='insert' THEN [1] ELSE [] END | MERGE (u:Label{gid:event.data.gid}) on match set u.id = event.data.id, u.name=event.data.name, u.sort_name=event.data.sort_name,u.type=event.data.type on create set u.id = event.data.id, u.name=event.data.name, u.sort_name=event.data.sort_name,u.type=event.data.type) FOREACH(ignoreMe IN CASE WHEN event.type='delete' THEN [1] ELSE [] END | MERGE (u:Label{gid:event.data.gid})  detach delete u) FOREACH(ignoreMe IN CASE WHEN event.type='update' THEN [1] ELSE [] END | MERGE (u:Label{gid:event.data.gid})  set u.id = event.data.id, u.name=event.data.name, u.sort_name=event.data.sort_name,u.type=event.data.type)

kafka.auto.offset.reset=earliest
kafka.group.id=neo4j
streams.sink.dlq=person-dlq

kafka.acks=all
kafka.num.partitions=1
kafka.retries=2
kafka.batch.size=16384
kafka.buffer.memory=33554432

The Neo4j kafka plug-in will poll to see who the Neo4j cluster leader is. The Neo4j cluster leader will automatically poll the Kafka topics for the data changes. If the cluster leader switches, the new leader will take over the polling and the retrieval from the topics.

If you want to change the number of records that Neo4j pulls from the Kafka topic, you can add this setting to your neo4j.conf file:

kafka.max.poll.records=5000

Once all of the configuration is completed, I have a running MySQL instance, a Kafka instance, a Maxwell’s Daemon configured to read from MySQL and write to Kafka topics and a Neo4j cluster that will read from the Kafka topics.

In part 2, we will show how this all works together.

Neo4j – Finding a doctor on the way to find coffee

I love coffee. I love finding new coffee shops and trying out great coffee roasters. Some of my favorites are Great Lakes Coffee, Stumptown Roasters in NYC’s Ace Hotel, Red Rooster (my choice for home delivery) and my go-to local coffee shop, The Grounds. The Grounds is owned by friends and you have to love their hashtag #LoveCoffeeLovePeople.

You are probably asking what this has to do with Neo4j or anything in general. Go pour yourself a cup of coffee and then come back to see where this leads.

In the Neo4j Uber H3 blog post, you saw how we could use the hexaddress to find doctors within a radius, bounding box, polygon search or even along a line between locations. The line between locations feature got me thinking what if I could get use that feature and combine it with turn-by-turn directions to find a doctor along the route. You never know when you may have had too much coffee and need to find the closest doctor. Or maybe you have a lot of event data (IED explosions for example) and you want to see which ones have occurred along a proposed route.

Let’s see if we can pull this off. I remembered that I had written some python code in conjunction with the Google Directions API. The directions API takes in a start address and an end address. For example:

https://maps.googleapis.com/maps/api/directions/json?origin='1700 Courthouse Rd, Stafford, VA 22554'&destination='50 N. Stafford Complex, Center St Suite 107, Stafford, VA 22556'&key='yourapikey'

The API returns a JSON document which includes routes and legs. Each leg has a start_location with a lat and lng value and an end_location with a lat and lng value. Check the link for details on the JSON format.

In my python code, I make a call to the Directions API. I then parse the routes and associated legs (that just sounds weird) to get the start lat/lng and the end lat/lng for each leg. I can then pass the pair into my Neo4j procedure (com.dfauth.h3.lineBetweenLocations), get the results and output the providers that are along that line. Here’s an example:

neoQuery = "CALL com.dfauth.h3.lineBetweenLocations(" + str(prevLat) + "," + str(prevLng) + "," + str(endlat) + "," +  str(endlng) +") yield nodes unwind nodes as locNode match (locNode)<-[:HAS_PRACTICE_AT]-(p:Provider)-[:HAS_SPECIALTY]->(t:TaxonomyCode) return distinct locNode.Address1 + ' ' + locNode.CityName +', ' + locNode.StateName as locationAddress, locNode.latitude as latitude, locNode.longitude as longitude, coalesce(p.BusinessName,p.FirstName + ' ' + p.LastName) as practiceName, p.NPI as NPI order by locationAddress;"
#	    		print(neoQuery)
	    		result = session.run(neoQuery)
	    		for record in result:
	    			print(record["locationAddress"] + " " + record["practiceName"])

When I tried this from my house to The Grounds, I got these results:

On the 15.5 minute drive, I would pass about 10 doctors. Now the NPI data isn’t totally accurate but you can see what is available along the route.

I thought it was cool. My python code (minus the API keys) are on Github. Refill your coffee and thanks for reading.

Neo4j – Uber H3 – Geospatial

We are going to take a slight detour with regards to the healthcare blog series and talk about Uber H3. H3 is a hexagonal hierarchical geospatial indexing system. It comes with an API for indexing coordinates into a global grid. The grid is fully global and you can choose your resolution. The advantages and disadvantages of the hexagonal grid system are discussed here and here . Uber open-sourced the H3 indexing system and it comes with a set of java bindings that we will use with Neo4j.

Since version 3.4, Neo4j has a native geospatial datatype. Neo4j uses the WGS-84 and WGS-84 3D coordinate reference system. Within Neo4j, we can index these point properties and query using our distance function or you query within a bounding box. Max DeMarzi blogged about this as did Neo4j. At GraphConnect 2018, Will Lyon and Craig Taverner had a session on Neo4j and Going Spatial. These are all great resources for Neo4j and geo-spatial search.

Why Uber H3

Uber H3 adds some new features that can be extended through Neo4j procedures. Specifically, we want to be able to query based on a polygon, query based on a polygon with holes, and even along a line. We will use the data from our healthcare demo dataset and show how we can use H3 hexagon addresses to help our queries.

H3 Procedure

For our first procedure, we will pass in the latitude, longitude and resolution and receive a hexAddress back.

The H3 interface also allows us to query via a polygon. With Neo4j, we can query using a bounding box like so:

Neo4j Bounding Box Query

In H3, the polygon search looks like this:

This returns in 17ms against 4.4 million locations.

If we combine this with our healthcare data, we can find providers with a certain taxonomy.

We can make this polygon as complicated as we need. This example is for a county polygon:

CALL com.dfauth.h3.polygonSearch([{lon:"-77.302457",lat:"38.504683"},{lon:"-77.310334",lat:"38.493926"},{lon:"-77.322622",lat:"38.467131"},{lon:"-77.32544",lat:"38.44885"},{lon:"-77.319036",lat:"38.417803"},{lon:"-77.310719",lat:"38.397669"},{lon:"-77.312201",lat:"38.390958"},{lon:"-77.314848",lat:"38.389579"},{lon:"-77.317288",lat:"38.383576"},{lon:"-77.296077",lat:"38.369797"},{lon:"-77.288145",lat:"38.359477"},{lon:"-77.28835",lat:"38.351286"},{lon:"-77.286202",lat:"38.347025"},{lon:"-77.286202",lat:"38.347024"},{lon:"-77.321403",lat:"38.345226"},{lon:"-77.339268",lat:"38.302723"},{lon:"-77.345728",lat:"38.26139"},{lon:"-77.326692",lat:"38.245136"},{lon:"-77.370301",lat:"38.246576"},{lon:"-77.39085",lat:"38.245589"},{lon:"-77.420148",lat:"38.257986"},{lon:"-77.447126",lat:"38.284614"},{lon:"-77.455692",lat:"38.301341"},{lon:"-77.467053",lat:"38.31866"},{lon:"-77.475137",lat:"38.32096"},{lon:"-77.478996",lat:"38.316693"},{lon:"-77.498754",lat:"38.32543"},{lon:"-77.506782",lat:"38.325925"},{lon:"-77.527185",lat:"38.320655"},{lon:"-77.526243",lat:"38.309531"},{lon:"-77.530289",lat:"38.309172"},{lon:"-77.54546",lat:"38.325081"},{lon:"-77.584673",lat:"38.346806"},{lon:"-77.594796",lat:"38.336022"},{lon:"-77.618727",lat:"38.367835"},{lon:"-77.634835",lat:"38.409713"},{lon:"-77.628433",lat:"38.452075"},{lon:"-77.634157",lat:"38.464508"},{lon:"-77.568349",lat:"38.520177"},{lon:"-77.530914",lat:"38.555929"},{lon:"-77.481488",lat:"38.592432"},{lon:"-77.463949",lat:"38.578686"},{lon:"-77.448683",lat:"38.580792"},{lon:"-77.395824",lat:"38.545827"},{lon:"-77.370142",lat:"38.519865"},{lon:"-77.334902",lat:"38.514569"},{lon:"-77.308138",lat:"38.499699"},{lon:"-77.310528",lat:"38.505289"},{lon:"-77.302457",lat:"38.504683"}],[{}]) yield nodes return nodes

In these examples, there are no “donut holes”. If I need a polygon donut hole to exclude an area, I can pass it in as the second parameter.

CALL com.dfauth.h3.polygonSearch([{lon:"-77.302457",lat:"38.504683"},{lon:"-77.310334",lat:"38.493926"},{lon:"-77.322622",lat:"38.467131"},{lon:"-77.32544",lat:"38.44885"},{lon:"-77.319036",lat:"38.417803"},{lon:"-77.310719",lat:"38.397669"},{lon:"-77.312201",lat:"38.390958"},{lon:"-77.314848",lat:"38.389579"},{lon:"-77.317288",lat:"38.383576"},{lon:"-77.296077",lat:"38.369797"},{lon:"-77.288145",lat:"38.359477"},{lon:"-77.28835",lat:"38.351286"},{lon:"-77.286202",lat:"38.347025"},{lon:"-77.286202",lat:"38.347024"},{lon:"-77.321403",lat:"38.345226"},{lon:"-77.339268",lat:"38.302723"},{lon:"-77.345728",lat:"38.26139"},{lon:"-77.326692",lat:"38.245136"},{lon:"-77.370301",lat:"38.246576"},{lon:"-77.39085",lat:"38.245589"},{lon:"-77.420148",lat:"38.257986"},{lon:"-77.447126",lat:"38.284614"},{lon:"-77.455692",lat:"38.301341"},{lon:"-77.467053",lat:"38.31866"},{lon:"-77.475137",lat:"38.32096"},{lon:"-77.478996",lat:"38.316693"},{lon:"-77.498754",lat:"38.32543"},{lon:"-77.506782",lat:"38.325925"},{lon:"-77.527185",lat:"38.320655"},{lon:"-77.526243",lat:"38.309531"},{lon:"-77.530289",lat:"38.309172"},{lon:"-77.54546",lat:"38.325081"},{lon:"-77.584673",lat:"38.346806"},{lon:"-77.594796",lat:"38.336022"},{lon:"-77.618727",lat:"38.367835"},{lon:"-77.634835",lat:"38.409713"},{lon:"-77.628433",lat:"38.452075"},{lon:"-77.634157",lat:"38.464508"},{lon:"-77.568349",lat:"38.520177"},{lon:"-77.530914",lat:"38.555929"},{lon:"-77.481488",lat:"38.592432"},{lon:"-77.463949",lat:"38.578686"},{lon:"-77.448683",lat:"38.580792"},{lon:"-77.395824",lat:"38.545827"},{lon:"-77.370142",lat:"38.519865"},{lon:"-77.334902",lat:"38.514569"},{lon:"-77.308138",lat:"38.499699"},{lon:"-77.310528",lat:"38.505289"},{lon:"-77.302457",lat:"38.504683"}],[{lon:'-77.530886',lat:'38.3609911'},{lon:'-77.524492',lat:'38.3411698'},{lon:'-77.524492',lat:'38.277433'},{lon:'-77.5731773',lat:'38.2774607'},{lon:'-77.594635',lat:'38.2771873'}]) yield nodes return nodes

In the 3.3.0 release of the Java bindings, H3 included the ability to return all hex addresses along a line between two points. One could combine this with a service like Google Directions to find all locations along a route. Imagine finding doctors along a route or find all events that occurred along a route.
Here is an example that returns providers who have billing addresses along a line:

CALL com.dfauth.h3.lineBetweenLocations(38.418582, -77.385268,38.500603, -77.444288) yield nodes 
unwind nodes as locNode 
match (locNode)<-[:HAS_BILLING_ADDRESS_AT]-(p:Provider)-[:HAS_SPECIALTY]->(t:TaxonomyCode) 
return distinct locNode.Address1 + ' ' + locNode.CityName +', ' + locNode.StateName as locationAddress, locNode.latitude as latitude, locNode.longitude as longitude, coalesce(p.BusinessName,p.FirstName + ' ' + p.LastName) as practiceName, p.NPI as NPI
order by locationAddress;

Pretty neat, right? As always, the source code as always is available on github.

Neo4j – Leveraging a graph for healthcare search

Graph-based search is intelligent: You can ask much more precise and useful questions and get back the most relevant and meaningful information, whereas traditional keyword-based search delivers results that are more random, diluted and low-quality.

With graph-based search, you can easily query all of your connected data in real time, then focus on the answers provided and launch new real-time searches prompted by the insights you’ve discovered.

Recently, I’ve been developing proof-of-concept (POC) demonstrations showing the power of graph-based search with healthcare data. In this series of blog posts, we will implement a data model that powers graph-based search solutions for the healthcare community.

Our first scenario will be to use Neo4j for some simple search and find a provider based on location and/or specialty. As my colleague Max DeMarzi says, “The optimal model is heavily dependent on the queries you want to ask”. Queries that we want to ask are things like: For the State of Virginia, can I find a provider by my location? Where is the nearest pharmacy? Can we break that down by County or by County and Specialty? The queries drive our model. The model that we will use for this scenario and one we will build on later looks like this:

What we see is that there is a Provider who has a Specialty and multiple locations (practice, billing and alternate practice). Postal codes are in a county and postal codes are also a certain distance in miles from each other. We also have loaded the DocGraph Referral/Patient Sharing Data set.

Data Sources
The US Government’s Centers for Medicare and Medicaid Services (CMS) has a whole treasure trove of data resources that we can use to load into Neo4j and use for our POC.

Our first dataset is the NPPES Data Dissemination File. This file contains information on healthcare providers in the US. The key field is the NPI which is a unique 10-digit identification number issued to health care providers in the United States by the CMS.

Our second dataset is a summary spreadsheet showing the number of patients in managed services by county. I’ll use this file to generate patients per county and distribute them across the US.

Our third dataset is the Physician and Other Supplier Public Use File (Physician and Other Supplier PUF). This file provides information on services and procedures provided to Medicare beneficiaries by physicians and other healthcare professionals. The Physician and Other Supplier PUF contains information on utilization, payment (allowed amount and Medicare payment), and submitted charges organized by National Provider Identifier (NPI), Healthcare Common Procedure Coding System (HCPCS) code, and place of service. I’ll use this file for data validation and for building out provider specialties.

Our fourth dataset is the Public Provider Enrollment Data. The Public Provider Enrollment data for Medicare fee-for-service includes providers who are actively approved to bill Medicare or have completed the 855O at the time the data was pulled from the Provider Enrollment and Chain Ownership System (PECOS). These files are populated from PECOS and contain basic enrollment and provider information, reassignment of benefits information and practice location city, state and zip.

Finally, we have a dataset that maps US Counties to US Zip Codes and a dataset that maps the distance between zip codes.

Data Downloads:
You can download the data files from:

NPPES Data Dissemination File
NUCC Taxonomy
Counties to Zip Code / FIPS
Zip Code Tabulation Area
Zip Code Distance

Data Loading

In the next post, I will document the data loading procedures that I wrote for this data.

Additional Information – Public Training (March 2019)

For the first time, datapalooza is hosting pre-conference classes on open data.
Here is the link to signup, you do not need to attend datapalooza to attend the classes.
We are offering training on the following open datasets at this years datapalooza:
  • Introduction to NPI and NPPES
  • Introduction to NDC data
  • Referral/Patient Sharing Data Tutorial (will cover all versions of the docgraph dataset)
  • Open Payments data

Modeling events in Neo4j to look for patterns

Recently, some of the prospects that I work with have wanted to understand event data and felt like a graph database would be the best approach. Being new to graphs, they aren’t always sure of the best modeling approach.

There are some resources available that talk about modeling events. For example, Neo4j’s own Mark Needham published a blog post showing modeling TV shows among other events.. Snowplow published a recent blog post that describes a similar data model.

One thing that we always try to stress is to build the model to effectively answer the questions you ask of the graph. In this prospect’s case, they were collecting item scans that occurred within a warehouse. Each item had a series of scans as it moved through the warehouse. Based on the individual item or even a type of item, could they discover common patterns or deviations from the normal pattern? Another use case for this would be to track how users traverse a website or how students complete an on-line course.

An initial approach might be to create a relationship between each item and the scanner. The model is shown below:

An (:Item)-[:HAS_TAG]->(:ItemTag)-[:SCANNED]->(:Scanner) is the Cypher pattern for this model.

In this model, it is easy to understand how many times an :ItemTag was scanned. Over time, a Scanner could easily become a supernode where we have millions of :SCANNED relationships attached to the node. Furthermore, if we want to find common patterns, we have to look at all :SCANNED relationships, order them by a property and then do comparisons. The amount of work increases as the amount of :SCANNED relationships increase. This is similar to the AIRPORT example in Max’s blogpost.

So what can we do? Let’s change the model to be a series of SCAN events that form a series of events from the initial scan to the final scan. For the relationship types, instead of using :NEXT or :PREVIOUS, we will name the relationship types by the scanner id. Let’s look at that model:


In this model, an :Item has an :ItemDay. This allows us to manage and search all tags by item and by day without creating supernodes. Between each :SCAN node, we use a specific relationship type that indicates the scanner. This allows us to easily find patterns within the data. Neo4j allows for over 64k custom relationship types, so you should be good with this approach.

If you were modeling a TV show, you may want to track when a NEW episode was show versus a REPEAT episode. You could model that as:
(:Episode)-[:NEW]->(:Episode)-[:REPEAT]->(:Episode)-[:REPEAT]->(:Episode)

For an item, we can find the most common patterns using the below Cypher code:

match (r:Tag)
with r
match path=(r)-[re*..50]->(:Scan)
with r.TagID as TagID,  [r IN re | TYPE(r)] as paths
order by length(path) desc 
with collect(paths) as allPaths, TagID
return count(TagID) as tagCount, head(allPaths)
order by tagCount desc;

The use of the variable length relationship allows us to easily find those patterns no matter the length. If we want to find an anomaly such as where :SCANNER_101 wasn’t the first scan, we could do something like:

match path=(a1:Tag)-[r2]->(a:Scan)-[r1:SCANNER_101]->(b:Scan)
return count(path);

With the ability to add native date/time properties to a node, we can use Cypher to calculate durations between event scans. For example, this snippet shows the minimum, maximum and average duration between scans for each type of scan.

match (a:Scan)-[r1]->(b:Scan)
with r1, duration.between(a.scanDateTime,b.scanDateTime) as theduration
return type(r1), max(theduration.minutes), min(theduration.minutes), avg(theduration.minutes)
order by type(r1) asc, avg(theduration.minutes) desc;

When modeling data in Neo4j, think of how you want to traverse the graph to get the answers that you need. The answers will inevitably determine the shape of the graph and the model that you choose.

Resources: