Locality-sensitive Hashing: Part 2 (Moving from Spark to Onyx)

In our previous post, we explained briefly about the pitfalls associated with data duplication and how we can use Record Linkage to detect duplicates in databases. In fact, in an earlier post (March, 2017), we had also discussed the concept of Record Linkage and data-deduplication on Spark.

There have been some recent exciting developments at FORMCEPT - in particular, our effort to explore execution platforms other than Apache Spark, such as Onyx. Onyx is a distributed computing platform written fully in Clojure. Since almost 70% of our codebase has already been shifted to Clojure, adopting Onyx made sense for us. It is also a part of our effort to be at par with global programming trends, as Onyx claims to match the current Big Data heavyweights - the likes of Spark, Storm, Cascading, Cascalog, etc.

The purpose of this post is NOT to compare Spark and Onyx. Instead, we would like to take this opportunity to share with you  our experience of writing code and shipping our Locality-sensitive Hashing (LSH) implementation to Onyx.

Every Onyx implementation starts with a workflow. It defines how your code will flow from function-to-function. Let us take a look at the following:

[sourcecode language="clojure"]
(def workflow
[[:in :gen-hash]
[:gen-hash :combine-by-hash]
[:combine-by-hash :gen-candidates]
[:gen-candidates :distinct-candidates]
[:distinct-candidates :calculate-similarity]
[:calculate-similarity :out]])
[/sourcecode]

As you can see the flow of our code is:

 

Each key is a task whose functionality can be defined in a catalog:

[sourcecode language="clojure"]

(defn catalog
[params]
[{:onyx/name :in
:onyx/plugin :onyx.plugin.core-async/input
:onyx/type :input
:onyx/medium :core.async
:onyx/batch-size batch-size
:onyx/max-peers 1
:onyx/doc "Reads segments from a core.async channel"}

{:onyx/name :gen-hash
:onyx/fn ::add-hash-to-records
:onyx/type :function
:onyx/batch-size batch-size
:onyx/doc "Generate hashes for every record"}

{:onyx/name :combine-by-hash
:onyx/fn :clojure.core/identity
:onyx/type :function
:onyx/batch-size batch-size
:onyx/doc "Combine by hashes"}

{:onyx/name :gen-candidates
:onyx/fn ::cartesian-products
:onyx/type :function
:onyx/batch-size batch-size
:onyx/doc "Generate candidate pairs"}

{:onyx/name :distinct-candidates
:onyx/fn :clojure.core/identity
:onyx/type :function
:onyx/batch-size batch-size
:onyx/doc "Applying overall distinct i.e. on the entire dataset"}

{:onyx/name :calculate-similarity
:onyx/fn ::find-levenstein-distance
:onyx/type :function
:onyx/batch-size batch-size
:onyx/doc "Calculate Similarity between candidates"}

{:onyx/name :out
:onyx/plugin :onyx.plugin.core-async/output
:onyx/type :output
:onyx/medium :core.async
:onyx/max-peers 1
:onyx/batch-size batch-size
:onyx/doc "Writes segments to a core.async channel"}])

[/sourcecode]

  1. :in - The type is Async Channel input. We will be putting some data on the channel first and then we'll be reading it as the input.
  2. :gen-hash - Will generate hashes (associate key value pair of :hash and value) and add them to the original records
  3. :combine-by-hash - Pass the incoming records as is, but will be doing a group by on the key :hash and collect its values as a list which will be described in windows and triggers below.
  4. :gen-candidates - Generate a cartesian product of the grouped candidates
  5. :distinct-candidates - Pass the incoming messages as is, but take out distinct records from the entire data which is described in windows and triggers below.
  6. :calculate-similarity - Calculate the similarity between two records (using Levenshtein Distance)
  7. :out - Store the output on another channel

The section below describes how we use windows and triggers for aggregations:

[sourcecode language="clojure"]

(defn windows
[params]
[{:window/id :combine-by-hash-window
:window/task :combine-by-hash
:window/type :global
:window/aggregation [:onyx.windowing.aggregation/collect-by-key :hash]}
{:window/id :distinct-candidates-window
:window/task :distinct-candidates
:window/type :global
:window/aggregation :org.formcept.onyx.onyx-utils.aggregation.core/onyx-distinct}])

(defn triggers
[params]
[{:trigger/window-id :combine-by-hash-window
:trigger/id :combine-by-hash-trigger
:trigger/refinement :onyx.refinements/accumulating
:trigger/on :onyx.triggers/segment
:trigger/threshold [2 :elements]
:trigger/emit ::dump-window!}
{:trigger/window-id :distinct-candidates-window
:trigger/id :distinct-candidates-trigger
:trigger/refinement :onyx.refinements/accumulating
:trigger/on :onyx.triggers/segment
:trigger/threshold [2 :elements]
:trigger/emit ::dump-window!}])

[/sourcecode]

Windows capture and bucket data over time. Triggers let you release the captured data on receiving various stimuli. In our case, we have used two windows:

  1. combine-by-hash-window - For collecting records by the hash value
  2. distinct-candidates-window - For getting the distinct (unique) list of candidate subsets

For these Windows, we have to specify their corresponding Triggers as well:

  1. combine-by-hash-trigger - Will accumulate values and release them as soon as the size of the collected records list becomes 2
  2. distinct-candidates-trigger - Does the same function as combine-by-hash-trigger

Flow Conditions

[sourcecode language="clojure"]
(defn flow-conditions
[params]
[{:flow/from :gen-hash
:flow/to [:combine-by-hash]
:flow/predicate ::remove-unhashed-records}
{:flow/from :combine-by-hash
:flow/to [:gen-candidates]
:flow/predicate ::remove-uncollected-records}
{:flow/from :distinct-candidates
:flow/to [:calculate-similarity]
:flow/predicate ::keep-only-distinct-candidates}
{:flow/from :calculate-similarity
:flow/to [:out]
:lsh-onyx/ld-threshold (:ld-threshold params)
:flow/predicate [::crosses-threshold? :lsh-onyx/ld-threshold]}])
[/sourcecode]

Flow conditions are used for isolating logic about whether or not segments should pass through different tasks in a workflow, and support a richer degree of composition with runtime parameterization.

Below is the description of all the flow conditions we have used in our code:

  1. :gen-hash -> :combine-by-hash - In this, we are removing all the unhashed (original) records (segments) which are being passed downstream
  2. :combine-by-hash -> :gen-candidates - In this, we are removing the individual records (segments) which are being passed downstream
  3. :distinct-candidates -> :calculate-similarity - In this, we are only allowing the distinct candidate lists to pass through
  4. :calculate-similarity -> :out - In this, we are checking if the levenshtein distance is crossing the threshold provided by the user. If it does, we remove them as the pair is not a duplicate. If it does not, the pair is a duplicate and we allow it to flow to :out

(Please take a look at the entire code here and learn more about Onyx here)

Our journey with Onyx began when we were exploring it only as a data transformation engine. Soon, we realized the massive potential of Onyx, and discovered its utility in running some of our Machine Learning and Statistical algorithms as well. LSH is one of the first algorithms of MECBOT (our flagship unified data analysis product) stack that we have attempted on Onyx. Next, we will be working on migrating some of our pattern detection algorithms, and will be sharing our experience on those as well. Stay tuned.