Intelligent user-controlled partitioning and writing distribution-aware NDB API Applications

Default partitioning

By default, Cluster will partition based on primary key
By default, Cluster will partition based on primary key

When adding rows to a table that’s using MySQL Cluster as the storage engine, each row is assigned to a partition where that partition is mastered by a particular data node in the Cluster. The best performance comes when all of the data required to satisfy a transaction is held within a single partition so that it can be satisfied within  a single data node rather than being bounced back and forth between multiple nodes where  extra latency will be introduced.

By default, Cluster partions the data by hashing the primary key. This is not always optimal.

For example, if we have 2 tables, the first using a single-column primary key (sub_id) and the second using a composite key (sub_id, service_name)…

If we then add data to these (initially empty) tables, we can then use the ‘explain’ command to see which partitions (and hence phyical hosts) are used to store the data for this single subscriber…

The service records for the same subscriber (sub_id = 1) are split accross 4 diffent partitions (p0, p1, p2 & p3). This means that the query results in messages being passed backwards and forwards between the 4 different data nodes which cnsumes extra CPU time and incurs extra latency.

User-defined partitioning to the rescue

We can override the default behaviour by telling Cluster which fields should be fed into the hash algorithm. For our example, it’s reasonable to expect a transaction to access multiple records for the same subscriber (identified by their sub_id) and so the application will perform best if all of the rows for that sub_id are held in the same partition…

Now all of the rows for sub_id=1 from the services table are now held within a single partition (p3) which is the same as that holding the row for the same sub_id in the names table. Note that it wasn’t necessary to drop, recreate and re-provision the services table, the following command would have had the same effect:

Writing a distribution-aware application using the NDB API

Distribution unaware NDB API application
Distribution unaware NDB API application

In our example, the data is nicely partitioned for optimum performance when accessing all of the subscriber’s data – a single data node holding all of their data. However, there is another step to take to get the best out of your NDB-API based application. By default, the NDB API will use the Transaction Coordinator (TC) on a ‘random’ data node to handle the transaction – we could get lucky and the guess is correct but it’s more likely that it will be sent to the wrong data node which with then have to proxy it to the correct data node. The probability of getting it right first time reduces as the number of node groups increases and so can prevent linear scaling.

It’s very simple to modify this behaviour so that the best data node/TC is hit first time, every time. When creating the transaction, the application can include parameters telling the NDB API one of the tables to be accessed and for what key(s). The NDB API will then use that information to identify the best TC to use…

Note that as the services table has been configured to use the same field (sub_id) for partitioning as the names table, the startTransaction method only needs to know about the namesTable as the TC that the NDB API selects will serve just as well for this subscriber’s data from the services table. The rest of the code can be found in distaware.

8 thoughts on “Intelligent user-controlled partitioning and writing distribution-aware NDB API Applications

  1. I am trying to understand the ramifications of setting “partition by key” on the “services” table. By storing all the “services” records with the associated “names” record, aren’t we preventing the ndbd nodes from working on a query in parallel?

    If there are many hundreds or thousands of “services” records per “names” record, then wouldn’t it be better if they were spread around? So that each of the 4 nodes, for example, could pull 1/4 of the records?

    What am I missing?

    1. Hi Mike,

      in general, MySQL Cluster performs best when an individual transaction can be handled by a single data node. As all of the data is in memory, it is extremely fast for the data node to work on that data. Where it gets slower is when there needs to be lots of messaging between nodes.

      There are some cases where the MySQL Server node can be smart and break up the work between data nodes and so get the benefits of adding parallelism and if the application is only processing one transaction at a time then that might improve performance. It’s more typical with Cluster though that the applications would be working on multiple sub_ids at the same time and so that is where you leverage the power of multiple, shared-nothing data nodes.

      The good news is that it’s extremely simple to change the partitioning for an existing database and so the best answer is to try it for yourself with your schema and application 🙂


  2. Hi Andrew,

    Nice post explaining about partitioning of rows in the MySQL cluster.

    Would you happen to have an example of how to partition using ClusterJ? I’m using ClusterJ and i’m about to use partitioning to get a better performance, how ever, i have a few confusions and hoping you would clear it for me.

    1. Since i’m using ClusterJ, would defining the partition key column(s) help if it is just defined in the CREATE TABLE DDL? (i think only mysqld would understand this).. does clusterJ understand the partition key from the DDL and automatically partition rows during inserts via this mechanism?

    2. ClusterJ uses annotations for defining partition keys. If i just define the partition key annotation for any column, would the records be automatically partitioned for any inserts that are executed? Or is it mandatory to call ‘session.setPartitionKey(Table.class, Paritition-key-value)’ during all my inserts? Considering that some tables use composite primary keys as well.

    1. Hi Jude,

      partitioning is a little different with MySQL Cluster as it defines how tables should be partitioned between node groups which can in turn have a big impact on latency and throughput. This post helps explain how to optimize your schema/application to exploit this.

      By default, the Primary Key is the partition key and is hashed to identify the partition. You can override this to indicate that only a subset of the columns from a multi-column PK should be used as the partition key. You can do this when creating the table or later using ALTER TABLE. Wherever you define it, it takes effect no matter how you access the database (i.e. all mysqld, the NDB API or other native APIs (that all call the NDB API).

      In other words, you can pre-create the table and override the default partitioning key through any mysqld.

      I’m not certain how the persistent annotation is used within ClusterJ but it can’t hurt to set it to match what you’ve defined in your schema as it may help performance.


  3. Hi Andrew,

    I have a question on distribution aware for ndb index scan.

    On the above whitepaper, Figure 7: Optimizing Index Scans with NDB API,
    index scan set the level of parallelism to 1;
    Then index scan only run on one ndb node.

    But our product sub tables really have 3 kinds of key, so can not partition all sub records on one ndb node group, may spread on 3 node groups.
    So, we can not use dist aware solution, but we still want to check the index scan performance.

    Our table index columns **include** partition key, so MySQL should know the index scan result records are partitioned on one ndb node group.
    Then, will ndb index scan automatically limit the scan on the single target node?
    Or, ndb index scan still do the scan on all nodes? This will waste many nodes resource, since they have no result records at all.

    1. Hi Song Yi,

      note that you can still make the system distribution-aware if the sharding key is *part* of the primary key for each of your tables by specifying that column with PARTITION BY KEY. Is this possible in you case?

      To find out which data nodes are involved then you could look how the values change for each node using the ndbinfo.counters table

      Regards, Andrew.

  4. Andrew,

    Yes, it is our case.
    Take the services table in the above blog post as example,
    We create an index on sub_id to speed up the search by sub_id.
    Then, we invoke ndb index scan api by sub_id=1.

    Will ndb automatically limit the scan on the single target node? As sub_id is partition key.
    Note, the following dist aware api parameters are NOT set:
    startTransaction(“names”, 1)

    And I tested on lab, the ndbinfo.counters can NOT show which node are involved by index scan(range scan), because RANGE_SCANS counters are associated with the **DBTC** (transaction co-ordinator) kernel block.
    The RANGE_SCANS counter always increase on only ONE node, I guess it is the TC node.

    1. Song Yi,

      I’m afraid you’re exhaustin my knowledge on using the NDB API directly – might be time for you to open a Service Request so that the support team can look into it for you.


Leave a Reply