Spark to Neo4j

This blog post is mostly for my benefit and for the ability to go back and remember work that I have done. With that being said, this blog post talks about the Neo4j Spark Connector that allows a user to write from a Spark Data Frame to a Neo4j database. I will show the configuration settings within the Databricks Spark cluster, build some DataFrames within the Spark notebook and then write nodes to Neo4j and merge nodes to Neo4j.

For this blog post, I am using the community Databricks site to run a simple Spark cluster.

Configuration:

For the cluster, I chose the Databricks 5.4 Runtime version which includes Apache Spark 2.4.3 and Scala 2.11. There are some Spark configuration options that configure the Neo4j user, Neo4j password and the Neo4j url for the bolt protocol.

Under the Libraries, I loaded the Neo4j Spark Connector library.

Once the configurations were updated and the library added, we can restart the cluster.

As a side note, the Notebook capability is a great feature.

Creating Nodes:

First I loaded up the Chicago crime data into a dataframe. Second was to convert the dataframe into a table. From the table, we’ll run some SQL and load the results into Neo4j. These steps are shown below.

Loading CSV into DataFrame
Creating a Table

This next screen shows how we can get a spark context and connect to Neo4j. Once we have the context, we can execute several Cypher queries. For our example, we are going to delete existing data and creating a constraint and an index.

Connecting to Neo4j and Running Queries

In this step, we can select data from the dataframe and load that data into Neo4j using the Neo4jDataFrame.createNodes option.

Loading Crime Nodes into Neo4j
Merging Crime Beat Nodes
Loading Data into Neo4j Graph Algorithm
Executing a Graph Algorithm

As you see, we can move data from Spark to Neo4j in a pretty seamless manner. The entire Spark notebook can be found here.

In the next post, we will connect Spark to Kafka and then allow Neo4j to read from the Kafka topic.

Neo4j 4.0 Docker

With the upcoming Neo4j 4.0 release, it was time to revisit deploying a Neo4j causal cluster via Docker. Our fantastic parter, GraphAware published a quickstart for deploying a cluster in Docker.

Docker is a tool designed to make it easy to create, deploy, and run applications by using containers. Containers allow developers to package an application with all of the components it needs and distribute it as an atomic, universal unit that can run on any host platform.

This docker-compose.yml file will start a three-core causal cluster.

  1. Download the docker-compose.yml
  2. Open a command shell in the same directory and execute:

docker-compose up

That’s it! After allowing each instance to come to life and to discover each other, the cluster is up and running.

Note: There are still some warnings that show up in the logs but these are items that are being worked on.

Neo4j 4.0 changes:

Some things that have changed and require your attention are using the advertised_address instead of listen_address for the clustering protocols.

  • NEO4J_causal__clustering_discovery__advertised__address=core2:5000
  • NEO4J_causal__clustering_transaction__advertised__address=core2:6000
  • NEO4J_causal__clustering_raft__advertised__address=core2:7000

When finished, the cluster can be shut down by opening a shell in the same directory as docker-compose.yml and executing:

docker-compose down

Neo4j – Kafka – MySQL: Configuration – Part 2

In Part 1, we configured Neo4j, Kafka and MySQL to talk using the Neo4j Kafka plugin and Maxwell’s Daemon. In part 2, I will show how data can be added into MySQL and then added/modified/deleted in Neo4j through the Kafka connector.

For our dataset, I chose the Musicbrainz dataset. This dataset has several tables and has a good amount of data to test with. For this test, I am only going to use the Label and the Artists tables but you could easily add more tables and more data.

For the Label table, here is the MySQL schema.

CREATE TABLE `label` (
  `id` bigint(20) DEFAULT NULL,
  `gid` varchar(100) DEFAULT NULL,
  `name` varchar(200) DEFAULT NULL,
  `sort_name` varchar(200) DEFAULT NULL,
  `type` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

When we start Maxwell’s Daemon, it automatically creates two topics on our Kafka machine. One is musicbrainz_musicbrainz_artist and the other is musicbrainz_musicbrainz_label.

When we do a CRUD operation on the MySQL table, the maxwell-daemon will write the operation type to the Kafka topic. For example, here is what an insert into the Artist table looks like:

{"database":"musicbrainz","table":"artist","type":"insert","ts":1563192678,"xid":13695,"xoffset":87,"data":{"gid":"174442ec-e2ac-451a-a9d5-9a0669fa2edd","name":"Goldcard","sort_name":"Goldcard","id":50419}}
{"database":"musicbrainz","table":"artist","type":"insert","ts":1563192678,"xid":13695,"xoffset":88,"data":{"gid":"2b99cd8e-55de-4a42-9cb8-6489f3195a4b","name":"Banda Black Rio","sort_name":"Banda Black Rio","id":106851}}
{"database":"musicbrainz","table":"artist","type":"insert","ts":1563192678,"xid":13695,"xoffset":89,"data":{"gid":"38927bad-687f-4189-8dcf-edf1b8f716b4","name":"Kauri Kallas","sort_name":"Kallas, Kauri","id":883445}}

The Neo4j cypher statement in our neo4j.conf file will read from that topic, determine the operation (insert, update or delete) and modify data in Neo4j appropriately.

streams.sink.topic.cypher.musicbrainz_musicbrainz_artist=FOREACH(ignoreMe IN CASE WHEN event.type='insert' THEN [1] ELSE [] END | MERGE (u:Artist{gid:event.data.gid}) on match set u.id = event.data.id, u.name=event.data.name, u.sort_name=event.data.sort_name on create set u.id = event.data.id, u.name=event.data.name, u.sort_name=event.data.sort_name) FOREACH(ignoreMe IN CASE WHEN event.type='delete' THEN [1] ELSE [] END | MERGE (u:Artist{gid:event.data.gid})  detach delete u) FOREACH(ignoreMe IN CASE WHEN event.type='update' THEN [1] ELSE [] END | MERGE (u:Artist{gid:event.data.gid})  set u.id = event.data.id, u.name=event.data.name, u.sort_name=event.data.sort_name)

To show how this works, we will remove a Label and then re-add that same label. In our Neo4j database, we already have 163K labels. We will delete the XTOY label from MySQL and watch the delete get placed on the Kafka queue and then removed from Neo4j.

In Neo4j, here is the XTOY label.

In MySQL, we are going to run:
delete from label where name=’XTOY’

In our Kafka musicbrainz_musicbrainz_label topic, we see a delete statement coming from MySQL:

{"database":"musicbrainz","table":"label","type":"delete","ts":1563197724,"xid":22983,"commit":true,"data":{"id":27894,"gid":"e8c1f93b-f518-43f2-b9be-e00e31a5e92d","name":"XTOY","sort_name":"2000","type":-1}}

Neo4j polls the topic, evaluates the type of action and acts appropriately. Int his case, it should delete the XTOY label. Let’s look at Neo4j now and see if the XTOY label has been removed:

We see that it has been removed. Now, let’s reinsert the record into MySQL and see if Neo4j picks up the insert.

INSERT INTO label(id,gid,name, sort_name, type) values(27894,'e8c1f93b-f518-43f2-b9be-e00e31a5e92d','XTOY','XTOY',-1)

{"database":"musicbrainz","table":"label","type":"insert","ts":1563197966,"xid":23309,"commit":true,"data":{"id":27894,"gid":"e8c1f93b-f518-43f2-b9be-e00e31a5e92d","name":"XTOY","sort_name":"XTOY","type":-1}}

Checking Neo4j once more we see the node has been added in.

The data is automatically replicated across the Neo4j cluster so we don’t have to worry about that aspect.

In summary, it is straight-forward to sync database changes from MySQL to Neo4j through Kafka.

Neo4j – Kafka – MySQL: Configuration – Part 1

With the new Neo4j Kafka streams now available, there has been a few articles such as A New Neo4j Integration with Apache Kafka and How to leverage Neo4j Streams and build a just-in-time data warehouse and Processing Neo4j Transaction Events with KSQL and Kafka Streams and finally How to embrace event-driven graph analytics using Neo4j and Apache Kafka. In this post, I will discuss configuring a Neo4j cluster that will use the Neo4j Kafka Streams to connect to a Kafka server. I will also talk about configuring Maxwell’s Daemon to stream data from MySQL to Kafka and then on to Neo4j.

The new Neo4j Kafka streams library is a Neo4j plugin that you can add to each of your Neo4j instances. It enables three types of Apache Kafka mechanisms:

  • Producer: based on the topics set up in the Neo4j configuration file. Outputs to said topics will happen when specified node or relationship types change
  • Consumer: based on the topics set up in the Neo4j configuration file. When events for said topics are picked up, the specified Cypher query for each topic will be executed
  • Procedure: a direct call in Cypher to publish a given payload to a specified topic

You can get a more detailed overview of how each of these might look like here.

Kafka

For Kafka, I configured an AWS EC2 instance to serve as my Kafka machine. For the setup, I followed the instructions from the quick start guide up until step 2. Before we get Kafka up and running, we will need to set up the consumer elements in the Neo4j configuration files.

If you are using the Dead Letter Queue functionality in the Neo4j Kafka connector, you will have to create that topic. For the MySQL sync topics, the Maxwell Daemon will automatically create those based on the settings in the config.properties file.

MySQL

For MySQL,  I set up a simple MySQL server on Ubuntu. A couple of things to note.

  • I had to modify the /etc/mysql/my.cnf file and add:
[mysqld]
server_id=21
log-bin=master
binlog_format=row
  • I had to create a Maxell user.
CREATE USER 'maxwell'@'%' IDENTIFIED BY 'YourStrongPassword';
	GRANT ALL ON maxwell.* TO 'maxwell'@'%';
	GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

Maxwell’s Daemon

For the sync between MySQL and Kafka, I used Maxwell’s daemon, an application that reads MySQL binlogs and writes row updates as JSON to Kafka, Kinesis, or other streaming platforms. Maxwell has low operational overhead, requiring nothing but mysql and a place to write to. Its common use cases include ETL, cache building/expiring, metrics collection, search indexing and inter-service communication. Maxwell gives you some of the benefits of event sourcing without having to re-architect your entire platform.

I downloaded Maxwell’s Daemon and installed it on the MySQL server. I then made the following configuration changes in the config.properties file.


producer=kafka
kafka.bootstrap.servers=xxx.xxx.xxx.xxx:9092
# mysql login info
host=xxx.xxx.xxx.xxx
user=maxwell
password=YourStrongMaxwellPassword
kafka_topic=blogpost_%{database}_%{table}

By configuring the kafka_topic to be linked to the database and the table, Maxwell automatically creates a topic for each table in the database.

Neo4j Cluster

Configuring a Neo4j cluster is covered in the Neo4j Operations Manual

We will use the Neo4j Streams plugin. As the instructions say, we download the latest release jar from latest and copy it into $NEO4J_HOME/plugins on each of the Neo4j cluster members. Then we will need to do some configuration.

In the Neo4j.conf file, we will need to configure the Neo4j Kafka plugin. This configuration will be the same for all Core servers in the Neo4j cluster. The neo4j.conf configuration is as follows:


### Neo4j.conf

kafka.zookeeper.connect=xxx.xxx.xxx.xxx:2181
kafka.bootstrap.servers=xxx.xxx.xxx.xxx:9092
streams.sink.enabled=true
streams.sink.polling.interval=1000

streams.sink.topic.cypher.Neo4jPersonTest=MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)

streams.sink.topic.cypher.musicbrainz_musicbrainz_artist=FOREACH(ignoreMe IN CASE WHEN event.type='insert' THEN [1] ELSE [] END | MERGE (u:Artist{gid:event.data.gid}) on match set u.id = event.data.id, u.name=event.data.name, u.sort_name=event.data.sort_name on create set u.id = event.data.id, u.name=event.data.name, u.sort_name=event.data.sort_name) FOREACH(ignoreMe IN CASE WHEN event.type='delete' THEN [1] ELSE [] END | MERGE (u:Artist{gid:event.data.gid})  detach delete u) FOREACH(ignoreMe IN CASE WHEN event.type='update' THEN [1] ELSE [] END | MERGE (u:Artist{gid:event.data.gid})  set u.id = event.data.id, u.name=event.data.name, u.sort_name=event.data.sort_name)

streams.sink.topic.cypher.musicbrainz_musicbrainz_label=FOREACH(ignoreMe IN CASE WHEN event.type='insert' THEN [1] ELSE [] END | MERGE (u:Label{gid:event.data.gid}) on match set u.id = event.data.id, u.name=event.data.name, u.sort_name=event.data.sort_name,u.type=event.data.type on create set u.id = event.data.id, u.name=event.data.name, u.sort_name=event.data.sort_name,u.type=event.data.type) FOREACH(ignoreMe IN CASE WHEN event.type='delete' THEN [1] ELSE [] END | MERGE (u:Label{gid:event.data.gid})  detach delete u) FOREACH(ignoreMe IN CASE WHEN event.type='update' THEN [1] ELSE [] END | MERGE (u:Label{gid:event.data.gid})  set u.id = event.data.id, u.name=event.data.name, u.sort_name=event.data.sort_name,u.type=event.data.type)

kafka.auto.offset.reset=earliest
kafka.group.id=neo4j
streams.sink.dlq=person-dlq

kafka.acks=all
kafka.num.partitions=1
kafka.retries=2
kafka.batch.size=16384
kafka.buffer.memory=33554432

The Neo4j kafka plug-in will poll to see who the Neo4j cluster leader is. The Neo4j cluster leader will automatically poll the Kafka topics for the data changes. If the cluster leader switches, the new leader will take over the polling and the retrieval from the topics.

If you want to change the number of records that Neo4j pulls from the Kafka topic, you can add this setting to your neo4j.conf file:

kafka.max.poll.records=5000

Once all of the configuration is completed, I have a running MySQL instance, a Kafka instance, a Maxwell’s Daemon configured to read from MySQL and write to Kafka topics and a Neo4j cluster that will read from the Kafka topics.

In part 2, we will show how this all works together.

Neo4j – Finding a doctor on the way to find coffee

I love coffee. I love finding new coffee shops and trying out great coffee roasters. Some of my favorites are Great Lakes Coffee, Stumptown Roasters in NYC’s Ace Hotel, Red Rooster (my choice for home delivery) and my go-to local coffee shop, The Grounds. The Grounds is owned by friends and you have to love their hashtag #LoveCoffeeLovePeople.

You are probably asking what this has to do with Neo4j or anything in general. Go pour yourself a cup of coffee and then come back to see where this leads.

In the Neo4j Uber H3 blog post, you saw how we could use the hexaddress to find doctors within a radius, bounding box, polygon search or even along a line between locations. The line between locations feature got me thinking what if I could get use that feature and combine it with turn-by-turn directions to find a doctor along the route. You never know when you may have had too much coffee and need to find the closest doctor. Or maybe you have a lot of event data (IED explosions for example) and you want to see which ones have occurred along a proposed route.

Let’s see if we can pull this off. I remembered that I had written some python code in conjunction with the Google Directions API. The directions API takes in a start address and an end address. For example:

https://maps.googleapis.com/maps/api/directions/json?origin='1700 Courthouse Rd, Stafford, VA 22554'&destination='50 N. Stafford Complex, Center St Suite 107, Stafford, VA 22556'&key='yourapikey'

The API returns a JSON document which includes routes and legs. Each leg has a start_location with a lat and lng value and an end_location with a lat and lng value. Check the link for details on the JSON format.

In my python code, I make a call to the Directions API. I then parse the routes and associated legs (that just sounds weird) to get the start lat/lng and the end lat/lng for each leg. I can then pass the pair into my Neo4j procedure (com.dfauth.h3.lineBetweenLocations), get the results and output the providers that are along that line. Here’s an example:

neoQuery = "CALL com.dfauth.h3.lineBetweenLocations(" + str(prevLat) + "," + str(prevLng) + "," + str(endlat) + "," +  str(endlng) +") yield nodes unwind nodes as locNode match (locNode)<-[:HAS_PRACTICE_AT]-(p:Provider)-[:HAS_SPECIALTY]->(t:TaxonomyCode) return distinct locNode.Address1 + ' ' + locNode.CityName +', ' + locNode.StateName as locationAddress, locNode.latitude as latitude, locNode.longitude as longitude, coalesce(p.BusinessName,p.FirstName + ' ' + p.LastName) as practiceName, p.NPI as NPI order by locationAddress;"
#	    		print(neoQuery)
	    		result = session.run(neoQuery)
	    		for record in result:
	    			print(record["locationAddress"] + " " + record["practiceName"])

When I tried this from my house to The Grounds, I got these results:

On the 15.5 minute drive, I would pass about 10 doctors. Now the NPI data isn’t totally accurate but you can see what is available along the route.

I thought it was cool. My python code (minus the API keys) are on Github. Refill your coffee and thanks for reading.

Neo4j – Uber H3 – Geospatial

We are going to take a slight detour with regards to the healthcare blog series and talk about Uber H3. H3 is a hexagonal hierarchical geospatial indexing system. It comes with an API for indexing coordinates into a global grid. The grid is fully global and you can choose your resolution. The advantages and disadvantages of the hexagonal grid system are discussed here and here . Uber open-sourced the H3 indexing system and it comes with a set of java bindings that we will use with Neo4j.

Since version 3.4, Neo4j has a native geospatial datatype. Neo4j uses the WGS-84 and WGS-84 3D coordinate reference system. Within Neo4j, we can index these point properties and query using our distance function or you query within a bounding box. Max DeMarzi blogged about this as did Neo4j. At GraphConnect 2018, Will Lyon and Craig Taverner had a session on Neo4j and Going Spatial. These are all great resources for Neo4j and geo-spatial search.

Why Uber H3

Uber H3 adds some new features that can be extended through Neo4j procedures. Specifically, we want to be able to query based on a polygon, query based on a polygon with holes, and even along a line. We will use the data from our healthcare demo dataset and show how we can use H3 hexagon addresses to help our queries.

H3 Procedure

For our first procedure, we will pass in the latitude, longitude and resolution and receive a hexAddress back.

The H3 interface also allows us to query via a polygon. With Neo4j, we can query using a bounding box like so:

Neo4j Bounding Box Query

In H3, the polygon search looks like this:

This returns in 17ms against 4.4 million locations.

If we combine this with our healthcare data, we can find providers with a certain taxonomy.

We can make this polygon as complicated as we need. This example is for a county polygon:

CALL com.dfauth.h3.polygonSearch([{lon:"-77.302457",lat:"38.504683"},{lon:"-77.310334",lat:"38.493926"},{lon:"-77.322622",lat:"38.467131"},{lon:"-77.32544",lat:"38.44885"},{lon:"-77.319036",lat:"38.417803"},{lon:"-77.310719",lat:"38.397669"},{lon:"-77.312201",lat:"38.390958"},{lon:"-77.314848",lat:"38.389579"},{lon:"-77.317288",lat:"38.383576"},{lon:"-77.296077",lat:"38.369797"},{lon:"-77.288145",lat:"38.359477"},{lon:"-77.28835",lat:"38.351286"},{lon:"-77.286202",lat:"38.347025"},{lon:"-77.286202",lat:"38.347024"},{lon:"-77.321403",lat:"38.345226"},{lon:"-77.339268",lat:"38.302723"},{lon:"-77.345728",lat:"38.26139"},{lon:"-77.326692",lat:"38.245136"},{lon:"-77.370301",lat:"38.246576"},{lon:"-77.39085",lat:"38.245589"},{lon:"-77.420148",lat:"38.257986"},{lon:"-77.447126",lat:"38.284614"},{lon:"-77.455692",lat:"38.301341"},{lon:"-77.467053",lat:"38.31866"},{lon:"-77.475137",lat:"38.32096"},{lon:"-77.478996",lat:"38.316693"},{lon:"-77.498754",lat:"38.32543"},{lon:"-77.506782",lat:"38.325925"},{lon:"-77.527185",lat:"38.320655"},{lon:"-77.526243",lat:"38.309531"},{lon:"-77.530289",lat:"38.309172"},{lon:"-77.54546",lat:"38.325081"},{lon:"-77.584673",lat:"38.346806"},{lon:"-77.594796",lat:"38.336022"},{lon:"-77.618727",lat:"38.367835"},{lon:"-77.634835",lat:"38.409713"},{lon:"-77.628433",lat:"38.452075"},{lon:"-77.634157",lat:"38.464508"},{lon:"-77.568349",lat:"38.520177"},{lon:"-77.530914",lat:"38.555929"},{lon:"-77.481488",lat:"38.592432"},{lon:"-77.463949",lat:"38.578686"},{lon:"-77.448683",lat:"38.580792"},{lon:"-77.395824",lat:"38.545827"},{lon:"-77.370142",lat:"38.519865"},{lon:"-77.334902",lat:"38.514569"},{lon:"-77.308138",lat:"38.499699"},{lon:"-77.310528",lat:"38.505289"},{lon:"-77.302457",lat:"38.504683"}],[{}]) yield nodes return nodes

In these examples, there are no “donut holes”. If I need a polygon donut hole to exclude an area, I can pass it in as the second parameter.

CALL com.dfauth.h3.polygonSearch([{lon:"-77.302457",lat:"38.504683"},{lon:"-77.310334",lat:"38.493926"},{lon:"-77.322622",lat:"38.467131"},{lon:"-77.32544",lat:"38.44885"},{lon:"-77.319036",lat:"38.417803"},{lon:"-77.310719",lat:"38.397669"},{lon:"-77.312201",lat:"38.390958"},{lon:"-77.314848",lat:"38.389579"},{lon:"-77.317288",lat:"38.383576"},{lon:"-77.296077",lat:"38.369797"},{lon:"-77.288145",lat:"38.359477"},{lon:"-77.28835",lat:"38.351286"},{lon:"-77.286202",lat:"38.347025"},{lon:"-77.286202",lat:"38.347024"},{lon:"-77.321403",lat:"38.345226"},{lon:"-77.339268",lat:"38.302723"},{lon:"-77.345728",lat:"38.26139"},{lon:"-77.326692",lat:"38.245136"},{lon:"-77.370301",lat:"38.246576"},{lon:"-77.39085",lat:"38.245589"},{lon:"-77.420148",lat:"38.257986"},{lon:"-77.447126",lat:"38.284614"},{lon:"-77.455692",lat:"38.301341"},{lon:"-77.467053",lat:"38.31866"},{lon:"-77.475137",lat:"38.32096"},{lon:"-77.478996",lat:"38.316693"},{lon:"-77.498754",lat:"38.32543"},{lon:"-77.506782",lat:"38.325925"},{lon:"-77.527185",lat:"38.320655"},{lon:"-77.526243",lat:"38.309531"},{lon:"-77.530289",lat:"38.309172"},{lon:"-77.54546",lat:"38.325081"},{lon:"-77.584673",lat:"38.346806"},{lon:"-77.594796",lat:"38.336022"},{lon:"-77.618727",lat:"38.367835"},{lon:"-77.634835",lat:"38.409713"},{lon:"-77.628433",lat:"38.452075"},{lon:"-77.634157",lat:"38.464508"},{lon:"-77.568349",lat:"38.520177"},{lon:"-77.530914",lat:"38.555929"},{lon:"-77.481488",lat:"38.592432"},{lon:"-77.463949",lat:"38.578686"},{lon:"-77.448683",lat:"38.580792"},{lon:"-77.395824",lat:"38.545827"},{lon:"-77.370142",lat:"38.519865"},{lon:"-77.334902",lat:"38.514569"},{lon:"-77.308138",lat:"38.499699"},{lon:"-77.310528",lat:"38.505289"},{lon:"-77.302457",lat:"38.504683"}],[{lon:'-77.530886',lat:'38.3609911'},{lon:'-77.524492',lat:'38.3411698'},{lon:'-77.524492',lat:'38.277433'},{lon:'-77.5731773',lat:'38.2774607'},{lon:'-77.594635',lat:'38.2771873'}]) yield nodes return nodes

In the 3.3.0 release of the Java bindings, H3 included the ability to return all hex addresses along a line between two points. One could combine this with a service like Google Directions to find all locations along a route. Imagine finding doctors along a route or find all events that occurred along a route.
Here is an example that returns providers who have billing addresses along a line:

CALL com.dfauth.h3.lineBetweenLocations(38.418582, -77.385268,38.500603, -77.444288) yield nodes 
unwind nodes as locNode 
match (locNode)<-[:HAS_BILLING_ADDRESS_AT]-(p:Provider)-[:HAS_SPECIALTY]->(t:TaxonomyCode) 
return distinct locNode.Address1 + ' ' + locNode.CityName +', ' + locNode.StateName as locationAddress, locNode.latitude as latitude, locNode.longitude as longitude, coalesce(p.BusinessName,p.FirstName + ' ' + p.LastName) as practiceName, p.NPI as NPI
order by locationAddress;

Pretty neat, right? As always, the source code as always is available on github.

Neo4j – Leveraging a graph for healthcare search

Graph-based search is intelligent: You can ask much more precise and useful questions and get back the most relevant and meaningful information, whereas traditional keyword-based search delivers results that are more random, diluted and low-quality.

With graph-based search, you can easily query all of your connected data in real time, then focus on the answers provided and launch new real-time searches prompted by the insights you’ve discovered.

Recently, I’ve been developing proof-of-concept (POC) demonstrations showing the power of graph-based search with healthcare data. In this series of blog posts, we will implement a data model that powers graph-based search solutions for the healthcare community.

Our first scenario will be to use Neo4j for some simple search and find a provider based on location and/or specialty. As my colleague Max DeMarzi says, “The optimal model is heavily dependent on the queries you want to ask”. Queries that we want to ask are things like: For the State of Virginia, can I find a provider by my location? Where is the nearest pharmacy? Can we break that down by County or by County and Specialty? The queries drive our model. The model that we will use for this scenario and one we will build on later looks like this:

What we see is that there is a Provider who has a Specialty and multiple locations (practice, billing and alternate practice). Postal codes are in a county and postal codes are also a certain distance in miles from each other. We also have loaded the DocGraph Referral/Patient Sharing Data set.

Data Sources
The US Government’s Centers for Medicare and Medicaid Services (CMS) has a whole treasure trove of data resources that we can use to load into Neo4j and use for our POC.

Our first dataset is the NPPES Data Dissemination File. This file contains information on healthcare providers in the US. The key field is the NPI which is a unique 10-digit identification number issued to health care providers in the United States by the CMS.

Our second dataset is a summary spreadsheet showing the number of patients in managed services by county. I’ll use this file to generate patients per county and distribute them across the US.

Our third dataset is the Physician and Other Supplier Public Use File (Physician and Other Supplier PUF). This file provides information on services and procedures provided to Medicare beneficiaries by physicians and other healthcare professionals. The Physician and Other Supplier PUF contains information on utilization, payment (allowed amount and Medicare payment), and submitted charges organized by National Provider Identifier (NPI), Healthcare Common Procedure Coding System (HCPCS) code, and place of service. I’ll use this file for data validation and for building out provider specialties.

Our fourth dataset is the Public Provider Enrollment Data. The Public Provider Enrollment data for Medicare fee-for-service includes providers who are actively approved to bill Medicare or have completed the 855O at the time the data was pulled from the Provider Enrollment and Chain Ownership System (PECOS). These files are populated from PECOS and contain basic enrollment and provider information, reassignment of benefits information and practice location city, state and zip.

Finally, we have a dataset that maps US Counties to US Zip Codes and a dataset that maps the distance between zip codes.

Data Downloads:
You can download the data files from:

NPPES Data Dissemination File
NUCC Taxonomy
Counties to Zip Code / FIPS
Zip Code Tabulation Area
Zip Code Distance

Data Loading

In the next post, I will document the data loading procedures that I wrote for this data.

Additional Information – Public Training (March 2019)

For the first time, datapalooza is hosting pre-conference classes on open data.
Here is the link to signup, you do not need to attend datapalooza to attend the classes.
We are offering training on the following open datasets at this years datapalooza:
  • Introduction to NPI and NPPES
  • Introduction to NDC data
  • Referral/Patient Sharing Data Tutorial (will cover all versions of the docgraph dataset)
  • Open Payments data

Modeling events in Neo4j to look for patterns

Recently, some of the prospects that I work with have wanted to understand event data and felt like a graph database would be the best approach. Being new to graphs, they aren’t always sure of the best modeling approach.

There are some resources available that talk about modeling events. For example, Neo4j’s own Mark Needham published a blog post showing modeling TV shows among other events.. Snowplow published a recent blog post that describes a similar data model.

One thing that we always try to stress is to build the model to effectively answer the questions you ask of the graph. In this prospect’s case, they were collecting item scans that occurred within a warehouse. Each item had a series of scans as it moved through the warehouse. Based on the individual item or even a type of item, could they discover common patterns or deviations from the normal pattern? Another use case for this would be to track how users traverse a website or how students complete an on-line course.

An initial approach might be to create a relationship between each item and the scanner. The model is shown below:

An (:Item)-[:HAS_TAG]->(:ItemTag)-[:SCANNED]->(:Scanner) is the Cypher pattern for this model.

In this model, it is easy to understand how many times an :ItemTag was scanned. Over time, a Scanner could easily become a supernode where we have millions of :SCANNED relationships attached to the node. Furthermore, if we want to find common patterns, we have to look at all :SCANNED relationships, order them by a property and then do comparisons. The amount of work increases as the amount of :SCANNED relationships increase. This is similar to the AIRPORT example in Max’s blogpost.

So what can we do? Let’s change the model to be a series of SCAN events that form a series of events from the initial scan to the final scan. For the relationship types, instead of using :NEXT or :PREVIOUS, we will name the relationship types by the scanner id. Let’s look at that model:


In this model, an :Item has an :ItemDay. This allows us to manage and search all tags by item and by day without creating supernodes. Between each :SCAN node, we use a specific relationship type that indicates the scanner. This allows us to easily find patterns within the data. Neo4j allows for over 64k custom relationship types, so you should be good with this approach.

If you were modeling a TV show, you may want to track when a NEW episode was show versus a REPEAT episode. You could model that as:
(:Episode)-[:NEW]->(:Episode)-[:REPEAT]->(:Episode)-[:REPEAT]->(:Episode)

For an item, we can find the most common patterns using the below Cypher code:

match (r:Tag)
with r
match path=(r)-[re*..50]->(:Scan)
with r.TagID as TagID,  [r IN re | TYPE(r)] as paths
order by length(path) desc 
with collect(paths) as allPaths, TagID
return count(TagID) as tagCount, head(allPaths)
order by tagCount desc;

The use of the variable length relationship allows us to easily find those patterns no matter the length. If we want to find an anomaly such as where :SCANNER_101 wasn’t the first scan, we could do something like:

match path=(a1:Tag)-[r2]->(a:Scan)-[r1:SCANNER_101]->(b:Scan)
return count(path);

With the ability to add native date/time properties to a node, we can use Cypher to calculate durations between event scans. For example, this snippet shows the minimum, maximum and average duration between scans for each type of scan.

match (a:Scan)-[r1]->(b:Scan)
with r1, duration.between(a.scanDateTime,b.scanDateTime) as theduration
return type(r1), max(theduration.minutes), min(theduration.minutes), avg(theduration.minutes)
order by type(r1) asc, avg(theduration.minutes) desc;

When modeling data in Neo4j, think of how you want to traverse the graph to get the answers that you need. The answers will inevitably determine the shape of the graph and the model that you choose.

Resources:

Using Amazon CloudWatch to monitor Neo4j Logs

This post describes how to set up Amazon CloudWatch. Amazon CloudWatch Logs allows you to monitor, store, and access your Neo4j log files from Amazon EC2 instances, AWS CloudTrail, or other sources. You can then retrieve the associated log data from CloudWatch Logs using the Amazon CloudWatch console, the CloudWatch Logs commands in the AWS CLI, the CloudWatch Logs API, or the CloudWatch Logs SDK. This article will describe how to configure CloudWatch to monitor the neo4j.log file, configure a metric, configure an alert on the metric and show how to view the logs with the CloudWatch console.

Setup

Setting up CloudWatch is a straight forward process that is well-documented on the CloudWatch website. You can configure CloudWatch on an existing EC2 instance or on a new EC2 instance.

CloudWatch also relies on your IAM or Secret_Key security details. Please note that in the above links.

Configuration

As part of the setup, you will need to configure the agent file to consume Neo4j’s neo4j.log file. In the existing EC2 instance, this is done in the _/etc/awslogs/awscli.conf_ file. In a new EC2 instance, you will need to configure the _agent configuration file_.

The configuration options are described in the CloudWatch Logs Agent Reference. For Neo4j 3.0, the following configuration will work:

--------------------------------------
[neo4j.log]
datetime_format = %Y-%m-%d %H:%M:%S%f%z 
file = /home/ec2-user/neo4j3/neo4j-enterprise-3.0.0/logs/neo4j.log
log_stream_name = {instance_id}
initial_position = start_of_file
log_group_name = /neo4j/logs
--------------------------------------

Viewing the Logs

CloudWatch provides a user interface to view the log files. Once you log into your Amazon console and select CloudWatch, you will be presented with the following console:

Selecting the /neo4j/logs group brings you to a page to select your logstream:

Finally, you can select on the server id and view the actual log file:

Configuring a Metric

CloudWatch allows you to configure custom metrics to monitor events of interest. The filter and pattern syntax describes how you can configure the metric. Unfortunately for us, you can only do text searches and not regex searches. For our example, we will configure a metric to look for a master failover.

The steps to configure a custom metric are documented here. After selecting our Log Group, you will click on the Create Metric Filter button.

Create Metric

For the filter pattern, we will use the text: “unavailable as master”. When you are finished, you will Assign the metric.

Configuring an Alert

CloudWatch provides the capability to be alerted when a threshold around a metric. We can create an alarm around our custom metric. The steps are well documented. The custom metric will show under the Custom Metrics section. You are able to name the alert, set thresholds and set the notification options.

Summary

Amazon CloudWatch Logs provides a simple and easy way to monitor your Neo4j log files on an EC2 instance. Setup is straightforward and should take no more than 15 minutes to configure and have logs streaming from your Neo4j instance to CloudWatch.

Accessing Hive/Impala from Neo4j

Quite frequently, I get asked about how you could import data from Hadoop and bring it into Neo4j. More often than not, the request is about importing from Impala or Hive. Today, Neo4j isn’t able to directly access an HDFS file system so we have to use a different approach.

For this example, we will use a Neo4j Unmanaged Extension. Unmanaged extensions provide us with the ability to hook into the native Java API and expand the capabilities of the Neo4j server. The unmanaged extension is reached via a REST API. With the extension, we can connect to Impala/Hive, query the database, return the data and then manipulate the data based on what we need.

Let’s create a simple example of an Java unmanaged extension which exposes a Neo4j REST endpoint that connects to an Hive instance running on another machine, query the database and create nodes.

What is Cloudera Impala
A good overview of Impala is in this presentation on SlideShare. Impala provides interactive SQL, nearly ANSI-92 standard SQL queries and provides a JDBC connector allowing external tools to access Impala. Cloudera Impala is the industry’s leading massively parallel processing (MPP) SQL query engine that runs natively in Apache Hadoop. The Apache-licensed, open source Impala project combines modern, scalable parallel database technology with the power of Hadoop, enabling users to directly query data stored in HDFS and Apache HBase without requiring data movement or transformation.

What is Cloudera Hive
From Cloudera, “Hive enables analysis of large data sets using a language very similar to standard ANSI SQL. This means anyone who can write SQL queries can access data stored on the Hadoop cluster. This discussion introduces the functionality of Hive, as well as its various applications for data analysis and data warehousing.”

Installing a Hadoop Cluster
For this demonstration, we will use the Cloudera QuickStart VM running on VirtualBox. You can download the VM and run it in VMWare, KVM and VirtualBox. I chose VirtualBox. The VM gives you access to a working Impala and Hive instance without manually installing each of the components.

Connecting to Impala/Hive
Cloudera provides a download page for the JDBC driver. Once you download the jdbc driver, you will want to unzip it and copy the jar files from the Cloudera_ImpalaJDBC4_2.5.5.1007 directory into your plugins directory.

Hortonworks
For Hortonworks, you can download a Sandbox for VirtualBox or VMWare. Once installed, you will need to set the IP address for the Hortonworks machine. The Hortonworks sandbox has the same sample data set in Hive.

Unmanaged Extension
Our unmanaged extension (code below), queries against the default database in Hive and creates an equipment node with two properties for each record in the sample_07 table. The query is hardcoded but could easily be passed in as a parameter with the POST request.

Deploying
1. Build the Jar
2. Copy target/neo4j-hadoop.jar to the plugins/ directory of your Neo4j server.
3. Copy the Cloudera JDBC jar files to your plugins directory.
4. Configure Neo4j by adding a line to conf/neo4j-server.properties:

org.neo4j.server.thirdparty_jaxrs_classes=com.neo4j.hadoop.example=/v1

5. Check that it is installed correctly over HTTP:

:GET /v1/service/helloworld

6. Query the database:

:POST /v1/service/equipment

7. Open the Neo4j browser and query the Equipment nodes:

MATCH (e:Equipment) return count(e);

Results in Neo4j
Screen Shot 2015-11-16 at 1.33.29 PM

Summary
Cloudera’s JDBC driver combined with Neo4J’s Unmanaged Extension provides an easy and fast way to connect to a Hadoop instance and populate a Neo4J instance. While this example doesn’t show relationships, you can easily modify the example to create the relationships.

Full code for the project is on Github.

Source Code for the example is below: