Source code for distributed.deploy.cluster

import logging
import os
from weakref import ref

import dask

from .adaptive import Adaptive

from ..utils import format_bytes, PeriodicCallback, log_errors, ignoring

logger = logging.getLogger(__name__)


[docs]class Cluster(object): """ Superclass for cluster objects This expects a local Scheduler defined on the object. It provides common methods and an IPython widget display. Clusters inheriting from this class should provide the following: 1. A local ``Scheduler`` object at ``.scheduler`` 2. scale_up and scale_down methods as defined below:: def scale_up(self, n: int): ''' Brings total worker count up to ``n`` ''' def scale_down(self, workers: List[str]): ''' Close the workers with the given addresses ''' This will provide a general ``scale`` method as well as an IPython widget for display. Examples -------- >>> from distributed.deploy import Cluster >>> class MyCluster(cluster): ... def scale_up(self, n): ... ''' Bring the total worker count up to n ''' ... pass ... def scale_down(self, workers): ... ''' Close the workers with the given addresses ''' ... pass >>> cluster = MyCluster() >>> cluster.scale(5) # scale manually >>> cluster.adapt(minimum=1, maximum=100) # scale automatically See Also -------- LocalCluster: a simple implementation with local workers """ def adapt(self, **kwargs): """ Turn on adaptivity For keyword arguments see dask.distributed.Adaptive Examples -------- >>> cluster.adapt(minimum=0, maximum=10, interval='500ms') """ with ignoring(AttributeError): self._adaptive.stop() if not hasattr(self, '_adaptive_options'): self._adaptive_options = {} self._adaptive_options.update(kwargs) self._adaptive = Adaptive(self.scheduler, self, **self._adaptive_options) return self._adaptive @property def scheduler_address(self): return self.scheduler.address @property def dashboard_link(self): template = dask.config.get('distributed.dashboard.link') host = self.scheduler.address.split('://')[1].split(':')[0] port = self.scheduler.services['bokeh'].port return template.format(host=host, port=port, **os.environ) def scale(self, n): """ Scale cluster to n workers Parameters ---------- n: int Target number of workers Example ------- >>> cluster.scale(10) # scale cluster to ten workers See Also -------- Cluster.scale_up Cluster.scale_down """ with log_errors(): if n >= len(self.scheduler.workers): self.scheduler.loop.add_callback(self.scale_up, n) else: to_close = self.scheduler.workers_to_close( n=len(self.scheduler.workers) - n) logger.debug("Closing workers: %s", to_close) self.scheduler.loop.add_callback(self.scheduler.retire_workers, workers=to_close) self.scheduler.loop.add_callback(self.scale_down, to_close) def _widget_status(self): workers = len(self.scheduler.workers) cores = sum(ws.ncores for ws in self.scheduler.workers.values()) memory = sum(ws.memory_limit for ws in self.scheduler.workers.values()) memory = format_bytes(memory) text = """ <div> <style scoped> .dataframe tbody tr th:only-of-type { vertical-align: middle; } .dataframe tbody tr th { vertical-align: top; } .dataframe thead th { text-align: right; } </style> <table style="text-align: right;"> <tr><th>Workers</th> <td>%d</td></tr> <tr><th>Cores</th> <td>%d</td></tr> <tr><th>Memory</th> <td>%s</td></tr> </table> </div> """ % (workers, cores, memory) return text def _widget(self): """ Create IPython widget for display within a notebook """ try: return self._cached_widget except AttributeError: pass from ipywidgets import Layout, VBox, HBox, IntText, Button, HTML, Accordion layout = Layout(width='150px') if 'bokeh' in self.scheduler.services: link = self.dashboard_link link = '<p><b>Dashboard: </b><a href="%s" target="_blank">%s</a></p>\n' % (link, link) else: link = '' title = '<h2>%s</h2>' % type(self).__name__ title = HTML(title) dashboard = HTML(link) status = HTML(self._widget_status(), layout=Layout(min_width='150px')) request = IntText(0, description='Workers', layout=layout) scale = Button(description='Scale', layout=layout) minimum = IntText(0, description='Minimum', layout=layout) maximum = IntText(0, description='Maximum', layout=layout) adapt = Button(description='Adapt', layout=layout) accordion = Accordion([HBox([request, scale]), HBox([minimum, maximum, adapt])], layout=Layout(min_width='500px')) accordion.selected_index = None accordion.set_title(0, 'Manual Scaling') accordion.set_title(1, 'Adaptive Scaling') box = VBox([title, HBox([status, accordion]), dashboard]) self._cached_widget = box def adapt_cb(b): self.adapt(minimum=minimum.value, maximum=maximum.value) adapt.on_click(adapt_cb) def scale_cb(b): with log_errors(): n = request.value with ignoring(AttributeError): self._adaptive.stop() self.scale(n) scale.on_click(scale_cb) scheduler_ref = ref(self.scheduler) def update(): status.value = self._widget_status() pc = PeriodicCallback(update, 500, io_loop=self.scheduler.loop) self.scheduler.periodic_callbacks['cluster-repr'] = pc pc.start() return box def _ipython_display_(self, **kwargs): return self._widget()._ipython_display_(**kwargs)