-
Notifications
You must be signed in to change notification settings - Fork 236
Kernel Startup using Yarn with resource and request check #699
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is great - thank you for starting this. I had a few comments, but nothing earth shattering.
def launch_process(self, kernel_cmd, **kwargs): | ||
# checks to see if the queue resource is available | ||
# if not kernel startup is not tried | ||
self.confirm_yarn_queue_avaibility(**kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in method name.
|
||
candidate_queue_name = str(kwargs['env']['KERNEL_QUEUE']) | ||
|
||
node_label = str(kwargs['env']['KERNEL_NODE_LABEL']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't require these values, nor do I think we can reliably come up with default values for these. We should pick one of these that is required to enable this functionality. I'm thinking KERNEL_QUEUE
. If that is provided, then the others must also be present for this logic to proceed. If KERNEL_QUEUE
is not provided, this logic is silently skipped. However, if KERNEL_QUEUE
is provided, but any of the others are not, let's log a warning message indicating the missing others, but NOT raise an exception - just exit the method.
Probably better to use **kwargs.get('env')
to avoid KeyError
exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Kevin,
It's good we are discussing on this part of the code which needs to be generic and is most important.
To give you a background, we are internally using a capacity scheduler and the queue that we specify is actually split into two partitions internally. Using the value of node_label helps us identify which queue partition we should be looking at to query it's absolute used capacity.
Now if we are to generalize this, should we just look at the queue name and query it's used capacity on parent level ? Or do you suggest some other route ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also from what i understand, in scenarios were we are using node labels to schedule the jobs,
mentioning a particular node label only schedule jobs on nodes having that label, while if you don't mention any node label while submitting jobs, it will scheduled in the default partition (plus some non-exclusive partition in certain scenarios).
In that scenario, assuming we have a queue name specified with/without node labels, we can look in the default/actual partition respectively inside that queue.
But will this logic be generic enough ? I don't think people use a node partitioning to submit jobs in every scenario or do they ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually on further thoughts, there are various cases we will need to cover within capacity scheduling itself,
Case 1 - Where user specifies queue name & node label
This means the resources need to be requested exclusively on these node labels so we will have to query this node label for it's used capacity which is already implemented above.
Case 2 - Where user specifies queue name without a node label
According to yarn concepts, this will allow yarn to schedule job on nodes that are default as well as on nodes that are non-exclusive on which queue has access. This would require us to actually query all the partitions that are associated with this queue, then query this partition to understand if it is non-exclusive and then add the results up to determine a yes or no. (Work required)
Case 3 - No node label concept is used.
Have not dived into this, but this should be simple, we can just look into the used capacity of the queue itself.
Case 4 - It's not a capacity scheduler (some other scheduler )
Open question
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the information. I am unfamiliar with YARN queueing. Adding @lresende.
However, I think if we let the existence of KERNEL_QUEUE_NAME
drive this, we might be okay (for these cases). If KERNEL_QUEUE_NAME
does not exist, I don't think we have any choice but to skip this code and proceed as we do today (which I assume is Case 4 for now).
How is Case 3 different from Case 2? In Case 3 you say look into the used capacity of the queue itself implying that KERNEL_QUEUE_NAME
was specified (unless there's some default queue that is implied by this).
If we require BOTH queue name and node label to enable this logic, then we address cases 1 and 4 (correct?).
I'm not sure how important Case 2 and 3 are. Hoping others more familiar with YARN can chime in here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kevin-bates I think this page can help us understand scheduling in YARN in a better way when node labels is used.
https://developer.ibm.com/hadoop/2017/03/10/yarn-node-labels/
Since we were using node labels, the PR actually tries to derive the remaining queue capacities given we have exclusive partitions(refer link). This covers Case 1. Case 2 is something in which the no nodel label is specified but the system still has node label partitioning.
Case 3 - which i have not researched as of now, is something where we have not set up yarn to have exlusive/non-exclusive partitions. It might be possible in this scenario, the solution is similar to Case 2 but we will have to dive a little.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@IMAM9AIS - thank you for the link.
I don't think we should get caught up in how to cover all the cases at this point. Let's identify either the KERNEL_QUEUE_NAME
or KERNEL_NODE_LABEL
(or both) as the "drivers" that enable this logic and get your primary case covered. If you can knock out others - great. Other users that might have different use cases can adjust this method accordingly. (Please add a link to the article in a nearby comment.)
We just need to ensure that not setting these values results in today's (default) behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kevin-bates Agreed.
From what we understand, we will have to use both (queue name and node label as drivers ) for this logic. I will be fixing all comments and committing these changes soon (along with default behaviour logic) .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have added the changes. Please have a look @kevin-bates .
yarn_available = False | ||
|
||
self.log.info("Yarn Endpoint to check for resources : " + self.yarn_endpoint) | ||
self.log.info("Retrying for " + str(self.yarn_resource_check_wait_time) + " ms if resources not found ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change these to DEBUG messages.
# each time we sleep we substract equal amount of time for the kernel launch timeout | ||
self.log.debug("Kernel Launch Timeout - {}".format(self.kernel_launch_timeout)) | ||
time.sleep(poll_interval) | ||
self.kernel_launch_timeout -= poll_interval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than update on each iteration, can we decrement the self.kernel_launch_timeout
whatever the total duration was back in the calling method? We either get an exception or succeed, so it can be the last statement in that method.
def handle_yarn_queue_timeout(self): | ||
|
||
# each time we sleep we substract equal amount of time for the kernel launch timeout | ||
self.log.debug("Kernel Launch Timeout - {}".format(self.kernel_launch_timeout)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This debug isn't very useful. I'd rather we add a debug statement following the call to cluster_scheduler_queue_availability
that includes some of the details we're checking for, along with the result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good - thank you!
I had a few comments, mostly relative to the half-second loop processing.
def confirm_yarn_queue_avaibility(self, **kwargs): | ||
def confirm_yarn_queue_availability(self, **kwargs): | ||
""" | ||
confirms if the yarn queue has capacity to handle the resource requests that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please capitalize 'confirms' -> 'Confirms'.
I think we should also add a statement near the beginning that this algorithm is subject to change as use cases become more defined. That then would allow the removal of "Current version of check," from the second bullet below.
yarn_available = self.resource_mgr.cluster_scheduler_queue_availability(self.candidate_partition) | ||
self.log.debug("Result: " + str(yarn_available)) | ||
# each time we sleep we substract equal amount of time for the kernel launch timeout | ||
self.kernel_launch_timeout -= poll_interval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove any debug statements from within the loop as these will be posted every half second and nothing is changing. We can move the first statement before the loop and the fact that we didn't throw an exception implies the result was successful. (Btw, you might want to keep this in mind when looking into the yarn-api-client implementations since some will be called frequently in loops like this.)
Where does the 'capacity < 95%' come from? That sounds like a detail of checking the availability and I'm not sure we should be assuming the 95% threshold. Perhaps we can make that more generic?
My previous comment regarding decrementing self.kernel_launch_timeout
was to do something like this...
while not yarn_available:
self.handle_yarn_queue_timeout()
yarn_available = self.resource_mgr.cluster_scheduler_queue_availability(self.candidate_partition)
check_availability_total_time = RemoteProcessProxy.get_time_diff(self.start_time, RemoteProcessProxy.get_current_time())
self.kernel_launch_timeout -= check_availability_total_time
If you'd rather decrement on each iteration, then please move it back into the timeout handler. I think it would be good to isolate the use of poll_interval
to as few places as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where does the 'capacity < 95%' come from? That sounds like a detail of checking the availability and I'm not sure we should be assuming the 95% threshold. Perhaps we can make that more generic?
Sure.
A thing to note here is, right now, we have not implemented the logic of checking the memory requests during kernel launch with the amount of memory available in yarn queue.
Rather we are checking if the queue is more than 95% full. If yes, we don't allow the kernel to be launched. This logic appeared more generic to us, given that we can actually tune this threshold according to yarn configurations and admin requirements to allow for kernel launch.
And yes, we should be making this check more generic enough.
I Will be committing files with relevant changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kevin-bates Uploaded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool - I like the idea of passing the required availability into the yarn-api method.
Couple comment regarding the loop processing, logging, etc.
|
||
candidate_queue_name = (env_dict.get('KERNEL_QUEUE', None)) | ||
node_label = env_dict.get('KERNEL_NODE_LABEL', None) | ||
partition_availability_threshold = float(env_dict.get('PARTITION_THRESHOLD',95.0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please prefix partition_threshold env name with YARN_
|
||
self.log.debug("Yarn Endpoint to check for resources : " + self.yarn_endpoint) | ||
self.log.debug("Checking if " + self.candidate_partition + " has used capacity <= " + str(partition_availability_threshold) + "%" ) | ||
self.log.debug("Retrying for " + str(self.yarn_resource_check_wait_time) + " ms if resources not found ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we please combine the yarn_endpoint statement into the 'Checking' statement?
self.log.debug("Checking {endpoint} if {partition} has used capacity...
Also, please use the string formatting used elsewhere using format()
rather sting concatenation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your contribution (and patience)! We'll hold off merging until [WIP] is removed and we've updated the dependency on the updated yarn-api-client.
I should have checked the build issue. We'll also need to get rid of lint errors. Please make sure you can run |
…m yarn-api-client and fixing lint issues
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Just had a couple comments regarding logging.
self.candidate_queue = self.resource_mgr.cluster_scheduler_queue(candidate_queue_name) | ||
|
||
if self.candidate_queue is None: | ||
self.log.debug("Queue: {} does not exist in the cluster".format(candidate_queue_name)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is requested functionality, by virtue of a non-null KERNEL_QUEUE
value, I think we should log this as a warning and add " - availability check will not be performed."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commited the changes.
self.candidate_partition, | ||
partition_availability_threshold)) | ||
if self.candidate_partition is None: | ||
self.log.debug("Parition: {} not found in {} queue".format(node_label, candidate_queue_name)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See previous comment - log as warning, add suffix statement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to fix typo in 'Partition' as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commited the changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @IMAM9AIS - this is a great contribution.
Now that yarn-api-client v0.3.6 is available, please update the minimum dependency and we'll move this forward!
@kevin-bates please have a look. |
The tests can be somewhat flakey. I restarted that job and suspect things will be fine. Let's restore the dependency cap and we're good. Thanks. |
@kevin-bates Please have a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @IMAM9AIS. Please remove [WIP]
from the PR's title when you're ready for its merge.
@kevin-bates done. |
Raising an Initial PR for this issue:-
#664
There is a seperate PR to be raised on yarn-api-client repo. We can have a discussion on this PR before moving on another one.