Implementing Affinity Propagation with big data transformation for Apache Flink

I’ve just written the results of some work I’ve done as maybe some pieces of it could be useful to someone. The only tests I’ve done is comparing that results were the same than scikit implementation of AP. Generated messages are the same than scikit, and to have same clusters convergence parameters need to be tuned.

    Introduction

Affinity Propagation is a clustering algorithm based on message passing that differs from other clustering algorithms like K-means in that it does not previously need the number of clusters to be known. It was first published on Science in 2007 with a later paper with a different approximation but same results published on 2009 (if you read them you will see that original messages can be derived from the later publication).

Same results can be obtained with both approximations with a different implementation, so first thing was choosing one among them. I first worked with the binary approximation of AP (the one from 2009 paper), mainly because I continued an existing implementation of it for Giraph. While that is the main reason I think there are some other reasons that make it a good option:

  • Is easy to add constraints to the clustering. One example can be found in the Capacitated Affinity Propagation section (adding an upper bound to the number of clusters)
  • It seems to fit good to big data graph libraries like Giraph from Hadoop or Gelly from Flink

So the first implementation I worked with was for Giraph, and although I did some progress with it there were some parts like damping, convergence or send messages in a synchronously manner (see Could messages be passed asynchronously in affinity propagation? found in faq section) that made the implementation a little bit tricky. Then I saw that Flink were interested on having AP implemented and they had an unfinished implementation of it so I started adapting that implementation for Gelly. After some work and some recommendations from Flink contributors we decided to implement the original algorithm with regular data transformations instead of binary approach using Gelly. And that is the one explained here.

    Affinity propagation

AP is a message passing algorithm where two types of messages are exchanged between nodes with existing similarities applying a damping factor that stops once it has converged. The algorithm is quite clear and those are the three points to implement (what is not that clear to me is how the message calculations where derived, an explanation can be found in the Derivation of affinity propagation as the max-sum algorithm in a factor graph section though, but to me is easier to follow it with the 2009 paper).

The two messages exchanged are what they call responsibility and availability. From the paper the responsibility r(i,k), sent from data point i to candidate exemplar point k, reflects the accumulated evidence for how well-suited point k is to serve as the exemplar for point i, taking into account other potential exemplars for point i and is calculated as follows:

r(i,k) \leftarrow s(i,k) -\max_{k' s.t. k'\neq k}\{a(i,k')+s(i,k')\}

And availability a(i,k), sent from candidate exemplar point k to point i, reflects the accumulated evidence for how appropriate it would be for point i to choose point k as its exemplar, taking into account the support from other points that point k should be an exemplar. And is calculated as follows with different calculation for self availability:

a(i,k) \leftarrow -\min\Big\{0,r(k,k)+\sum_{i'\textrm{s.t.}i'\notin\{i,k\}}\max\{0,r(i',k)\}\Big\}

a(k,k) \leftarrow \sum_{i'\textrm{s.t.}i'\neq k}\max\{0,r(i',k)\}

And this is damping explained from the paper: when updating the messages, it is important that they be damped to avoid numerical oscillations that arise in some circumstances. Each message is set to l times its value from the previous iteration plus 1 – l times its prescribed updated value, where the damping factor l is between 0 and 1. Is not mandatory to implement damping but they recommend doing it. They also recommend adding some noise to avoid oscillations, I did not implement that though.

Reading faq section was useful to me for the implementation.

    Implementation graph

affinity-propagation-bulk-plan-5

This github repository contains the code.

Setting up python confluent-kafka for macOS

All steps and links are for version 3.0 of confluent and macOS Sierra.

First thing we need is having confluent kafka services up and running. Download tar file under Zip and Tar archives section from here. Uncompress downloaded file and start/test services. All the steps to do so can be found in confluent quickstart guide. It is quite straightforward and no problems should rise here.

After trying to install confluent-kafka python libraries with pip install confluent-kafka I’ve seen it crashes in macOS with below error

fatal error: ‘librdkafka/rdkafka.h’ file not found

It seems is not finding librdkafka headers and the way I found to solve it is building and installing everything from scratch.

We will need librdkafka to build and install confluent-kafka python library so lets do that.

git clone https://github.com/edenhill/librdkafka
cd librdkafka
./configure
make
sudo make install

By default headers of librdkafka will be in /usr/local/include/librdkafka and libraries in /usr/local/lib. We will need these paths to install python libraries so make sure everything is there.

Next step is install confluent-kafka python library, lets get it from git and build it.

git clone https://github.com/confluentinc/confluent-kafka-python

We will need to set C_INCLUDE_PATH and LIBRARY_PATH environment variables to the path where librdkafka headers and libs are before building it.

export C_INCLUDE_PATH=/usr/local/include/
export LIBRARY_PATH=/usr/local/lib/lib

Lets install it

python setup.py build
sudo python setup.py install

We are done, now confluent python library should be ready to be imported from our project.