Bug 1251977 - A DistributedCallable with defined keys is executing on local node , even if the local node does not own any of the keys
A DistributedCallable with defined keys is executing on local node , even i...
Status: ASSIGNED
Product: JBoss Data Grid 6
Classification: JBoss
Component: Infinispan (Show other bugs)
6.5.0
Unspecified Unspecified
unspecified Severity unspecified
: ER1
: 6.5.1
Assigned To: William Burns
Martin Gencur
:
Depends On:
Blocks:
  Show dependency treegraph
 
Reported: 2015-08-10 08:45 EDT by Shay Matasaro
Modified: 2015-08-13 10:58 EDT (History)
6 users (show)

See Also:
Fixed In Version:
Doc Type: Bug Fix
Doc Text:
Story Points: ---
Clone Of:
Environment:
Last Closed:
Type: Bug
Regression: ---
Mount Type: ---
Documentation: ---
CRM:
Verified Versions:
Category: ---
oVirt Team: ---
RHEL 7.3 requirements from Atomic Host:
Cloudforms Team: ---


Attachments (Terms of Use)
reproducer (2.24 KB, text/plain)
2015-08-10 08:45 EDT, Shay Matasaro
no flags Details


External Trackers
Tracker ID Priority Status Summary Last Updated
JBoss Issue Tracker ISPN-5674 Major Resolved A DistributedCallable with defined keys is executing on local node , even if the local node does not own any of the key... 2016-01-20 07:07 EST

  None (edit)
Description Shay Matasaro 2015-08-10 08:45:10 EDT
Created attachment 1061027 [details]
reproducer

A DistributedCallable which is submitted with  defined  keys is executing  on local node , even if the local node does not own any of the keys

reproducer is attached
Comment 2 William Burns 2015-08-12 11:37:55 EDT
Shay, the point of the key input for DistributedExecutorService is not to determine how it is invoked, but rather where it is invoked.  The keys determine what node(s) to execute the callable on.  Since the local node owns all keys it runs the callable locally.

If you notice the setEnvironment method on the DistributedCallable is called passing in the keys you specified.  This way the task can then do processing for those keys if desired.
Comment 3 William Burns 2015-08-12 12:56:25 EDT
Also an example I tweaked some tracing a bit from your example but running multiple instances only runs the task on one node.

Node1 Output:

Thread - Thread[pool-5-thread-1,5,main]- wburns-58709 Got here with paramSet [[runIdKey3]]
Distributed Run Id Incrementor is 2

Node2 Output:

Thread - Thread[pool-5-thread-1,5,main]- wburns-57307 Got here with paramSet [[runIdKey3]]
Distributed Run Id Incrementor is 2
Thread - Thread[remote-thread--p2-t1,5,main]- wburns-57307 Got here with paramSet [[runIdKey3]]
Thread - Thread[remote-thread--p2-t1,5,main]- wburns-57307 Got here with paramSet [[runIdKey3]]

Node3 Output:

Distributed Run Id Incrementor is 2

Node4 Output:

Distributed Run Id Incrementor is 2

As you can see from the output node 1 started as owner then node 2 joined and became owner and subsequently stayed owner for 2 more joins of which those nodes didn't run the task.  Unless there is something else I am missing.
Comment 4 pronane 2015-08-12 13:27:33 EDT
Hi William,

From the API for DistributedExecutor:
submit
<T,K> NotifyingFuture<T> submit(DistributedTask<T> task,
                              K... input)
Submits the given DistributedTask for execution on a single Infinispan node.

input - input keys for this task, effective if and only if task is instance of DistributedCallable

DistributedCallable:
"Invoked by execution environment after DistributedCallable has been migrated for execution to a specific Infinispan node."

Why does the API mention "single" and "specific" node, if it's not determined by the inputKeys.  My understanding is this works similar to an EntryProcessor in Coherence, which would only execute on the node where the partition stores the key(s).

Also "Execution environment will chose an arbitrary node N hosting some or all of the keys specified as input", but we've shown its not on ANY nodes, and it is still being executed.  Why would it "choose" a node that doesn't have any keys?

Thanks
Paul
Comment 5 pronane 2015-08-12 13:29:32 EDT
Sorry I meant to add above that the Key does not exist in the Cache, but its still being executed on the local node.  Try just one node in a cluster.

"The keys determine what node(s) to execute the callable on. "

- the keys don't exist in the node, but it still executes.
Comment 6 dereed 2015-08-12 15:55:45 EDT
Hi Paul,

> Why does the API mention "single" and "specific" node, if it's not determined by the inputKeys.

"execution on a single Infinispan node" means it will always run on exactly one node in the cluster (regardless of how many keys are specified, what nodes own them, whether they exist in the cluster, etc).

The "specific Infinispan node" to run on is chosen from nodes that own at least one of the inputKeys.  But note that owning a key does *not* mean that the key currently exists in the cluster.  Key ownership is based on applying a hash function to the key.

-Dennis
Comment 7 pronane 2015-08-13 04:52:06 EDT
Hi Dennis,

Thanks for this, but I don't think this is very clear.  How can you own something you don't have?

Thanks
Paul
Comment 8 Shay Matasaro 2015-08-13 09:40:55 EDT
@wburns

there seems to be almost no affinity between the task keys and the node that receives the task, which is very confusing from a user perspective, e.g. why is the task running when none of the keys exists

if this is by design and based on your description then i agree that it is not a bug, but I suggest to improve things by doing one of the following :
1) Increase the affinity i.e. send the task only to a node for which on of the keys exists and the node is the owner.
2) switch the submit(DistributedTask<T> task, K... input) to run on ANY one node , rather then implying any kind of affinity
Comment 9 William Burns 2015-08-13 10:58:20 EDT
(In reply to Shay Matasaro from comment #8)
> @wburns
> 
> there seems to be almost no affinity between the task keys and the node that
> receives the task, which is very confusing from a user perspective, e.g. why
> is the task running when none of the keys exists

The key(s) are used solely to determine the target.  If you have 1 key in the collection the task will always be ran on the node who owns the key irrespective of it being present.  By passing the key you know that you can gain some advantage of having locality with where that key would reside.  If you have more than 1 key then you are saying you are fine with it running on a node that owns at least one of the keys and are fine losing locality for the keys that aren't owned by that node.  If you wanted locality for a different key this key should be provided instead.

(In reply to Shay Matasaro from comment #8)
> 
> if this is by design and based on your description then i agree that it is
> not a bug, but I suggest to improve things by doing one of the following :
> 1) Increase the affinity i.e. send the task only to a node for which on of
> the keys exists and the node is the owner.

Dist exec doesn't deal with existence of keys dictating how it is ran.  It says run this task once on a node or all nodes (submitEverywhere).  The key(s) are just used as hints and possibly arguments.  If you want something to be ran only because of the existence of key then the task should verify it's existence before running (since it has locality this check would be quite fast).  Another option is map/reduce as this only runs if the entry exists.  This highlights the premise that dist exec is about executing code where as map/reduce is about processing data.

(In reply to Shay Matasaro from comment #8)
> 2) switch the submit(DistributedTask<T> task, K... input) to run on ANY one
> node , rather then implying any kind of affinity

#2 is precisely what submit(Callable<V> callable) gives you.  Note you can still have your Callable implement DistributedCallable to inject the Cache.

Note You need to log in before you can comment on or make changes to this bug.