Source code for oarphpy.plotting

# Copyright 2023 Maintainers of OarphPy
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os

from oarphpy import util


[docs]def hash_to_rbg(x, s=0.8, v=0.8): """Given some value `x` (integral types work best), hash `x` to an `(r, g, b)` color tuple using a hue based on the hash and the given `s` (saturation) and `v` (lightness).""" import colorsys import sys import numpy as np # NB: ideally we just use __hash__(), but as of Python 3 it's not stable, # so we use a trick based upon the Knuth hash import hashlib h_i = int(hashlib.md5(str(x).encode('utf-8')).hexdigest(), 16) h = (h_i % 2654435769) / 2654435769. rgb = 255 * np.array(colorsys.hsv_to_rgb(h, s, v)) return tuple(rgb.astype(int).tolist())
[docs]def img_to_data_uri(img, format='jpg', jpeg_quality=75): """Given a numpy array `img`, return a `data:` URI suitable for use in an HTML image tag.""" from io import BytesIO out = BytesIO() import imageio kwargs = dict(format=format) if format == 'jpg': kwargs.update(quality=jpeg_quality) imageio.imwrite(out, img, **kwargs) from base64 import b64encode data = b64encode(out.getvalue()).decode('ascii') from six.moves.urllib import parse data_url = 'data:image/png;base64,{}'.format(parse.quote(data)) return data_url
def get_hw_in_viewport(img_hw, viewport_hw): vh, vw = viewport_hw h, w = img_hw if h > vh: rescale = float(vh) / h h = rescale * h w = rescale * w if w > vw: rescale = float(vw) / w h = rescale * h w = rescale * w return int(h), int(w) def img_to_img_tag( img, display_viewport_hw=None, # E.g. (1000, 1000) image_viewport_hw=None, # E.g. (1000, 1000) format='jpg', jpeg_quality=75): if image_viewport_hw is not None: th, tw = get_hw_in_viewport(img.shape[:2], image_viewport_hw) th = max(1, th) tw = max(1, tw) import cv2 img = cv2.resize(img, (tw, th), interpolation=cv2.INTER_NEAREST) dh, dw = img.shape[:2] if display_viewport_hw is not None: dh, dw = get_hw_in_viewport((dh, dw), display_viewport_hw) src = img_to_data_uri(img, format=format, jpeg_quality=jpeg_quality) TEMPLATE = """<img src="{src}" height="{dh}" width="{dw}" />""" return TEMPLATE.format(src=src, dh=dh, dw=dw) def _unpack_pyspark_row(r): """Unpack a `pyspark.sql.Row` that contains a single value.""" return r[0] # NB: 0 as in 0th column; pyspark.sql.Row provides indexing # for syntactic sugar
[docs]def df_histogram(spark_df, col, num_bins): """Compute and return a histogram of `bins` of the values in the column named `col` in spark Dataframe `spark_df`. Return type is designed to match `numpy.histogram()`. NB: if your `col` has only NaNs or NULLs, then pyspark's RDD::histogram() call below might fail and claim the RDD is empty. """ import numpy as np assert num_bins >= 1 col_val_rdd = spark_df.select(col).rdd.map(_unpack_pyspark_row) buckets, counts = col_val_rdd.histogram(num_bins) return np.array(counts), np.array(buckets)
def save_bokeh_fig(fig, dest, title=None): from bokeh import plotting if not title: title = os.path.split(dest)[-1] plotting.output_file(dest, title=title, mode='inline') plotting.save(fig) util.log.info("Wrote Bokeh figure to %s" % dest)
[docs]class HistogramWithExamplesPlotter(object): """Create and return a Bokeh plot depicting a histogram of a single column in a Spark DataFrame. Clicking on a bar in the histogram will interactively show examples from that bucket. `SUB_PIVOT_COL` - Optionally choose an additional dimension of the data and include histograms of the data pivoted by that dimension. For example, if we are histogramming the "height" dimension over a population, and we set `SUB_PIVOT_COL` to the "gender" column, then we'll get a histogram of height over ALL genders as well as height histograms for each distinct value in the "gender" column. The user can override how examples are displayed; subclasses can override `HistogramWithExamplesPlotter::display_bucket()` See `HistogramWithExamplesPlotter::run()`. """ ## Core Params NUM_BINS = 50 SUB_PIVOT_COL = None # See above about SUB_PIVOT_COL -- if this is a string, then we will # facet the histogram using the categorical value in this column. WIDTH = 900 # Bokeh's plots (especially in single-column two-row layout we use) work # best with a fixed width. For Jupyter notebooks, a width of 900 pixels # fits without horizontal scolling. APPROX_MAX_ROWS_PER_BUCKET = -1 # If at least one bucket has more rows than can be stored in memory, then # Spark will OOM trying to run `display_bucket()` on all those rows. In # this case, the user may (likely) not need to run `display_bucket()` on # _all_ bucket rows, but rather a sample of them. Set this attribute # to a positive number such that `display_bucket()` sees at least # `APPROX_MAX_ROWS_PER_BUCKET` rows (chosen uniformly at random). Use a # negative number to disable. We default this feature to "enabled" so # that this utility works does not OOM on imbalanced datasets # out-of-the-box. APPROX_MAX_ROWS_PER_BUCKET_SEED = 1337 # Seed for random sampling described above. ## Plotting params TITLE = None # By default use DataFrame Column name def display_bucket(self, sub_pivot, bucket_id, irows): import itertools rows_str = "<br />".join(str(r) for r in itertools.islice(irows, 5)) TEMPLATE = """ <b>Pivot: {spv} Bucket: {bucket_id} </b> <br/> {rows} <br/> <br/> """ disp = TEMPLATE.format(spv=sub_pivot, bucket_id=bucket_id, rows=rows_str) return bucket_id, disp def _build_data_source_for_sub_pivot(self, spv, df, col): import numpy as np import pandas as pd util.log.info("... building data source for %s ..." % spv) if spv == 'ALL': sp_src_df = df else: sp_src_df = df.filter(df[self.SUB_PIVOT_COL] == spv) util.log.info("... histogramming %s ..." % spv) hist, edges = df_histogram(sp_src_df, col, self.NUM_BINS) # Use this Pandas Dataframe to serve as a bokeh data source # for the plot sp_df = pd.DataFrame(dict( count=hist, proportion=hist / np.sum(hist), left=edges[:-1], right=edges[1:], )) sp_df['legend'] = str(spv) from bokeh.colors import RGB sp_df['color'] = RGB(*hash_to_rbg(spv)) util.log.info("... display-ifying examples for %s ..." % spv) def get_display(): # First, Re-bucket each row using what (in SQL) looks like a CASE-WHEN # statement: # SELECT # CASE # WHEN 0 <= val AND val < 10 THEN 0 # WHEN 10 <= val AND val < 20 THEN 10 # ... # END AS bucket, ... # We use the DataFrame API to construct the query because it's easier. # Spark will compile it to native code on-the-fly. from pyspark.sql import functions as F col_def = None buckets = list(zip(edges[:-1], edges[1:])) for bucket_id, (lo, hi) in enumerate(buckets): # The last spark histogram bucket is closed, but we want open if bucket_id == len(buckets) - 1: hi += 1e-9 args = ( (sp_src_df[col] >= float(lo)) & (sp_src_df[col] < float(hi)), bucket_id ) if col_def is None: col_def = F.when(*args) else: col_def = col_def.when(*args) col_def = col_def.otherwise(-1) df_bucketed = sp_src_df.withColumn('psegs_plot_bucket', col_def) # The data might be wildly imbalanced and many (or even all) rows # might fall in a single bucket. That could lead to an OOM below. # Mitigate the OOM using random sampling-- the user likely does not # want display-ify and see _all_ these rows anyways. if self.APPROX_MAX_ROWS_PER_BUCKET > 0: bucket_id_to_sample_frac = dict( (bucket_id, min(1., float(self.APPROX_MAX_ROWS_PER_BUCKET) / max(1, count))) for bucket_id, count in enumerate(hist)) df_bucketed = df_bucketed.sampleBy( 'psegs_plot_bucket', bucket_id_to_sample_frac, seed=self.APPROX_MAX_ROWS_PER_BUCKET_SEED) # Second, we collect chunks of rows partitioned by bucket ID so that we # can run our display function in parallel over buckets. bucketed_chunks = df_bucketed.rdd.groupBy(lambda r: r.psegs_plot_bucket) bucket_disp = bucketed_chunks.map( lambda b_irows: self.display_bucket(spv, b_irows[0], b_irows[1])) bucket_to_disp = dict(bucket_disp.collect()) # Finally, return a column of display strings ordered by buckets so that # we can add this column to the output histogram DataFrame. return [ bucket_to_disp.get(b, '') for b in range(len(buckets)) ] sp_df['display'] = get_display() return sp_df
[docs] def run(self, df, col): """Compute histograms and return the final plot. Args: df (pyspark.sql.DataFrame): Read from this DataFrame. The caller may want to `cache()` the DataFrame as this routine will do a variety of random reads and aggregations on the data. col (str): The x-axis for the computed histogram shall this this column in `df` as the chosen metric. Spark automatically ignores nulls and nans. Returns: bokeh layout object with a plot. """ import pyspark.sql assert isinstance(df, pyspark.sql.DataFrame) assert col in df.columns util.log.info("Plotting histogram for %s of %s ..." % (col, df)) sub_pivot_values = ['ALL'] if self.SUB_PIVOT_COL: distinct_rows = df.select(self.SUB_PIVOT_COL).distinct() sub_pivot_values.extend( sorted( distinct_rows.rdd.map(_unpack_pyspark_row).collect())) ## Compute a data source Pandas Dataframe for every sub-pivot spv_to_panel_df = dict( (spv, self._build_data_source_for_sub_pivot(spv, df, col)) for spv in sub_pivot_values) ## Make the plot from bokeh import plotting fig = plotting.figure( title=self.TITLE or col, tools='tap,pan,wheel_zoom,box_zoom,reset', width=self.WIDTH, x_axis_label=col, y_axis_label='Count') from bokeh.models import ColumnDataSource from bokeh.models import Legend from bokeh.models import LegendItem legend_items = [] for spv in sub_pivot_values: plot_src = spv_to_panel_df[spv] plot_src = ColumnDataSource(plot_src) r = fig.quad( source=plot_src, bottom=0, top='count', left='left', right='right', color='color', fill_alpha=0.5, hover_fill_color='color', hover_fill_alpha=1.0, ) legend_items.append(LegendItem(label=str(spv), renderers=[r])) from bokeh.models import HoverTool fig.add_tools( HoverTool( renderers=[r], # For whatever reason, adding a hover tool for each quad # makes the interface dramatically faster in the browser mode='vline', tooltips=[ ('Sub-pivot', '@legend'), ('Count', '@count'), ('Proportion', '@proportion'), ('Value of %s' % col, '@left'), ])) legend = Legend(items=legend_items, click_policy='hide') fig.add_layout(legend, "right") ## Add the 'show examples' tool and div from bokeh.models.widgets import Div ctxbox = Div(width=self.WIDTH, text= "Click on a histogram bar to show examples. " "Click on the legend to show/hide a series.") from bokeh.models import TapTool taptool = fig.select(type=TapTool) from bokeh.models import CustomJS taptool.callback = CustomJS( args=dict(ctxbox=ctxbox), code=""" var idx = cb_data.source.selected.indices[0]; ctxbox.text='' + cb_data.source.data.display[idx]; """) from bokeh.layouts import column layout = column(fig, ctxbox) return layout