View OarphPy on Github

                   _________________________
                  < OarphPy!! Oarph! Oarph! >
                  <   OarphKit for Python!! >
                   -------------------------
                                 \
                                  \
  ____                __   ___       -~~~~-
 / __ \___ ________  / /  / _ \__ __|O __ O|
/ /_/ / _ `/ __/ _ \/ _ \/ ___/ // /|_\__/_|__-
\____/\_,_/_/ / .__/_//_/_/   \_,---(__/\__)---
          .--/_/             /___/ /  ~--~
     ,__;`  o __`'.          _,..-/  | \/ |
     '  `'---'  `'.'.      .'.'` |   | /\ |   |
                   .'-...-`.'  _/ /\__    __/\ \_
                     -...-`  ~~~~~    ~~~~    ~~~~~
License Build Status PyPI version

OarphPy is a collection of Python utilities for Data Science with PySpark and Tensorflow. Related (but orthogonal) to OarphKit.

Quickstart

Install from PyPI: pip install oarphpy. We test OarphPy in a variet of environments (see below), so it should play well with your Jupyter/Colab notebook or project environment. To include all extras, use pip install oarphpy[all].

Or use the dockerized environment hosted on DockerHub:

$ ./oarphcli --shell
-- or --
$ docker run -it --net=host oarphpy/full bash

See also API documentation.

Demos

Dockerized Development Environments

OarphPy is built and tested in a variety of environments to ensure the library works with and without optional dependencies. These environments are shared on DockerHub and defined in the docker subdirectory of this repo:

  • oarphpy/full – Includes Tensorflow, Jupyter, a binary install of Spark, and other tools like Bokeh. Use this environment for adhoc data science or as a starter for other projects.

  • oarphpy/base-py2 – Tests oarphpy in a vanilla Python 2.7 environment to ensure clean interop with other projects.

  • oarphpy/base-py3 – Tests oarphpy in a vanilla Python 3 environment to ensure clean interop with other projects.

  • oarphpy/spark – Tests oarphpy with a vanilla install of PySpark to ensure basic compatibility.

  • oarphpy/tensorflow – Tests oarphpy with Tensorflow 1.x to ensure basic compatibility (e.g. of oarphpy.util.tfutil).

Development

See ./oarphcli --help for the development and release workflow.

OarphPy API Documentation

oarphpy.spark

oarphpy.spark.for_each_executor(spark, thunk)[source]

While Spark does not officially provide an API since the number of executors can change through the course of a job, this approach leverages the best practices to evaluate thunk at most once per executor.

oarphpy.spark.cluster_cpu_count(spark)[source]

Like os.cpu_count() but for an entire Spark cluster. Useful for scalling memory-intensive jobs that use the RDD api.

oarphpy.spark.union_dfs(*dfs)[source]

Return the union of a sequence DataFrames and attempt to merge the schemas of each (i.e. union of all columns). Based upon https://stackoverflow.com/a/40404249

oarphpy.spark.get_balanced_sample(spark_df, col, n_per_category=None, seed=1337)[source]

Given a column col in spark_df, return a balanced sample (countering class imbalances in spark_df[col]). Optionally limit the sample to having up to n_per_category examples for every distinct categorical value of spark_df[col].

oarphpy.spark.cluster_get_info(spark)[source]

Return a text report showing various details about each worker in the cluster.

oarphpy.spark.test_pi(spark)[source]

Run the “textbook” Monte Carlo Pi Sampling demo and assert correctness. When run on a cluster, this test can help suss out networking issues between the master and worker machines.

oarphpy.spark.test_egg(spark, modname='oarphpy', test_egg_contents=<function _op_test>)[source]

Test the egg that SessFactory (below) dynamically builds and includes in spark jobs. We’ll run the function test_egg_contents on each worker to actually execute code included in the egg.

oarphpy.spark.test_tensorflow(spark)[source]

Remotely test Tensorflow support in the given spark cluster

class oarphpy.spark.CounterAccumulator[source]
zero(value)[source]

Provide a “zero value” for the type, compatible in dimensions with the provided value (e.g., a zero vector)

addInPlace(value1, value2)[source]

Add two values of the accumulator’s data type, returning a new value; for efficiency, can also update value1 in place and return it.

class oarphpy.spark.SessionFactory[source]

This class is a factory for pre-configured SparkSession instances. A primary feaure of this factory is that it automagically includes the caller’s python module as a Spark PyFile, thus effectively shipping the user’s python project with the Spark job. This class also helps centralize project-wide Spark configuration. This class is designed as programmatic replacement for the spark-submit shell script.

To create and use a session:

>>> from oarphpy import spark as S
>>> spark = S.SessionFactory.getOrCreate()
>>> S.num_executors(spark)
1

Or using as a context manager:

>>> with S.SessionFactory.sess() as spark:
...     print(S.num_executors(spark))
1

See LocalK8SSpark below for an example of how to subclass this factory for your own project. See also NBSpark below for an example subclass that enables interop with the sparkmonitor package for jupyter.

classmethod create_egg(force_new=False)[source]

Build a Python Egg from the current project and return a path to the artifact. The path may be to a cached, pre-computed egg only if not force_new. The ‘current project’ is either class-defaulted or auto-deduced.

Why an Egg? pyspark supports zipfiles and egg files as Python artifacts. One might wish to use a wheel instead of an egg. See this excellent article and repo:

The drawbacks to using a wheel include:
  • wheels often require native libraries to be installed (e.g. via

    apt-get), and those deps are typically best baked into the Spark Worker environment (versus installed every job run).

  • The BdistSpark example above is actually rather slow, especially

    when Tensorflow is a dependency, and BdistSpark must run before every job is submitted.

  • Spark treats wheels as zip files and unzips them on every run; this

    unzip operation can be very expensive if the zipfile contains large binaries (e.g. tensorflow)

  • Wheels are not yet officially supported:

    https://issues.apache.org/jira/browse/SPARK-6764

In comparison, an Egg provides the main benefits we want (to ship project code, often pre-committed code, to workers).

classmethod getOrCreate()[source]

Spark sessions are typically instantiated using the Builder.getOrCreate() or SparkSession.getOrCreate() methods. This method is a drop-in replacement that uses class-specified defaults, includes the local python module as a PyFile, etc.

Returns:

A pyspark.sql.session.SparkSession instance.

class oarphpy.spark.LocalK8SSpark[source]

Example of how to subclass the Spark factory above for use with K8S

classmethod getOrCreate()[source]

Spark sessions are typically instantiated using the Builder.getOrCreate() or SparkSession.getOrCreate() methods. This method is a drop-in replacement that uses class-specified defaults, includes the local python module as a PyFile, etc.

Returns:

A pyspark.sql.session.SparkSession instance.

class oarphpy.spark.NBSpark[source]

NBSpark is a session builder for local Jupyter notebooks. Also includes support for the sparkmonitor jupyter package, see https://krishnan-r.github.io/sparkmonitor/

classmethod getOrCreate()[source]

Spark sessions are typically instantiated using the Builder.getOrCreate() or SparkSession.getOrCreate() methods. This method is a drop-in replacement that uses class-specified defaults, includes the local python module as a PyFile, etc.

Returns:

A pyspark.sql.session.SparkSession instance.

class oarphpy.spark.Tensor[source]

An ndarray-like object designed to store numpy arrays in Parquet / Spark SQL format. Spark’s DenseVector and Matrix unfortunately don’t support arbitrary tensor shape. Furthermore, Tensor stores data in an explicit order accessible to external readers such as Eigen in C++ or nd4j / BLAS wrappers in Java.

class oarphpy.spark.CloudpickeledCallable(func=None)[source]

Wraps callable objects (e.g. functions, including lambdas) and uses cloudpickle for serialization. Spark uses cloudpickle for serializing _tasks_ (e.g. map functions) but uses pickle for serializing _data_. In particular, data in a Spark RDD or DataFrame must be pickleable. CloudpickeledCallable provides a wrapper so that you can embed Python functions as data in RDDs, DataFrames, and other forms of data at rest (e.g. pickle files or Parquet data).

Note that cloudpickle can be selective about how much of the object tree that it serializes; some imports and globals may get ignored. When you deserialize and invoke a CloudpickeledCallable, your interpreter should have the same (or similar) code as that used when serializing the callable, otherwise behavior may be difficult to predict.

Note that cloudpickle can’t handle non-serializable data like thread local variables, mutices, etc. CloudpickeledCallable won’t work for all code.

CloudpickeledCallable is useful for embedding flyweights in your dataset. (FMI see <https://en.wikipedia.org/wiki/Flyweight_pattern> ) For example:

>>> def load_matrix(path):
>>>   import numpy as np
>>>   return np.loadtxt(path)
>>> my_db_row = {
>>>     'path': 'path/to/data.txt',
>>>     'factory':
>>>        CloudpickeledCallable(lambda: load_matrix('path/to/data.txt'))
>>> }
>>> import pickle
>>> pickle.dump(my_db_row, open('dumped.pkl', 'wb'))

Now if you deserialize my_db_row from disk and run my_db_row[‘factory’](), your load_matrix() helper will get invoked with the embedded path. Thus your my_db_row is a flyweight for the data in ‘path/to/data.txt’.

class oarphpy.spark.RowAdapter[source]

Transforms between custom objects and `pyspark.sql.Row`s used in Spark SQL or Parquet files. Use to encode numpy arrays and standard Python objects with a transparent Parquet schema that is accessible to other readers.

Usage:
  • Use RowAdapter.to_row() in place of the pyspark.sql.Row constructor.

  • Call RowAdapter.from_row() on any pyspark.sql.Row instance, e.g.

    within an RDD.map() call or after a DataFrame.collect() call.

  • Decoding requires Python objects to have an available zero-arg __init__()

Unfortunately, we can’t use Spark’s UDT API to embed this adapter (and obviate user calls) because UDTs require schema definitions. Furthermore, Spark <=2.x could not handle UDTs nested in maps or lists; i.e. [UDT()] (i.e. a list of UDTs) and {‘foo’: UDT()} (i.e. a map with UDT values) would cause Spark to crash. Moreover, for ndarray data, Spark’s ml.linalg package coerces all data to floats.

Benefits of RowAdapter:
  • Transparently handles numpy arrays and numpy boxed scalar types

    (e.g. np.float32).

  • Deep type adaptation; supports nested types.

  • At the decode stage, supports evolution of object types independent of
    the schema of data at rest:
    • Added object fields don’t get set unless there’s a recorded value

    • Removed object fields will get ignored

    • NB: Fields that change type will get set with the data at rest;

      if you need to change type, consider adding a new field.

  • Handles slotted Python objects (which Spark currently does not support),

    as well as un-slotted objects (where Spark supports automatic encoding but not decoding).

  • Enables saving objects and numpy arrays to Parquet in a format accessible

    to external systems (no additional SERDES library required)

  • Uses cloudpickle to serialize CloudpickeledCallable-wrapped functions.

oarphpy.spark.spark_df_to_tf_dataset(spark_df, shard_col, spark_row_to_tf_element, tf_element_types, tf_output_shapes=None, non_deterministic_element_order=True, num_reader_threads=-1, logging_name='spark_tf_dataset')[source]

Create a tf.data.Dataset that reads from the Spark Dataframe spark_df. Executes parallel reads using the Tensorflow’s internal (native code) threadpool. Each thread reads a single Spark partition at a time.

This utility is similar to Petastorm’s make_reader() but is far simpler and leverages Tensorflow’s build-in threadpool (so we let Tensorflow do the read scheduling). Status: alpha-quality; some perf quirks.

Parameters:
  • spark_df (pyspark.sql.DataFrame) – Read from this Dataframe.

  • shard_col (str) – Implicly shard the dataset using this column; read one shard per reader thread at a time to conserve memory.

  • spark_row_to_tf_element (func) – Use this function to map each pyspark.sql.Row in spark_df to a tuple that represents a single element of the induced TF Dataset.

  • tf_element_types (tuple) – The types of the elements that spark_row_to_tf_element returns; e.g. (tf.float32, tf.string).

  • tf_output_shapes (tuple) – Optionally specify the shape of the output of spark_row_to_tf_element; e.g. (tf.TensorShape([]), tf.TensorShape([None])) (where the former return element is a single scalar and the latter is a list)

  • non_deterministic_element_order (bool) – Allow the resulting tf.data.Dataset to have elements in non-deterministic order for speed gains.

  • num_reader_threads (int) – Tell Tensorflow to use this many reader threads, or use -1 to provision one reader thread per CPU core.

  • logging_name (str) – Log progress under this name.

Returns:

The induced TF Datset with one element per

row in spark_df.

Return type:

tf.data.Dataset

oarphpy.plotting

oarphpy.plotting.hash_to_rbg(x, s=0.8, v=0.8)[source]

Given some value x (integral types work best), hash x to an (r, g, b) color tuple using a hue based on the hash and the given s (saturation) and v (lightness).

oarphpy.plotting.img_to_data_uri(img, format='jpg', jpeg_quality=75)[source]

Given a numpy array img, return a data: URI suitable for use in an HTML image tag.

oarphpy.plotting.df_histogram(spark_df, col, num_bins)[source]

Compute and return a histogram of bins of the values in the column named col in spark Dataframe spark_df. Return type is designed to match numpy.histogram().

NB: if your col has only NaNs or NULLs, then pyspark’s RDD::histogram() call below might fail and claim the RDD is empty.

class oarphpy.plotting.HistogramWithExamplesPlotter[source]

Create and return a Bokeh plot depicting a histogram of a single column in a Spark DataFrame. Clicking on a bar in the histogram will interactively show examples from that bucket.

SUB_PIVOT_COL - Optionally choose an additional dimension of the data and include histograms of the data pivoted by that dimension. For example, if we are histogramming the “height” dimension over a population, and we set SUB_PIVOT_COL to the “gender” column, then we’ll get a histogram of height over ALL genders as well as height histograms for each distinct value in the “gender” column.

The user can override how examples are displayed; subclasses can override HistogramWithExamplesPlotter::display_bucket()

See HistogramWithExamplesPlotter::run().

run(df, col)[source]

Compute histograms and return the final plot.

Parameters:
  • df (pyspark.sql.DataFrame) – Read from this DataFrame. The caller may want to cache() the DataFrame as this routine will do a variety of random reads and aggregations on the data.

  • col (str) – The x-axis for the computed histogram shall this this column in df as the chosen metric. Spark automatically ignores nulls and nans.

Returns:

bokeh layout object with a plot.

oarphpy.util.thruput_observer

class oarphpy.util.thruput_observer.ThruputObserver(name='', log_on_del=False, only_stats=None, log_freq=100, n_total=None, n_total_chunks=None)[source]

A utility for measuring the runtime and throughput of a subroutine. Similar in spirit to tqdm, except ThruputObserver:

  • Tracks not just time but a size metric (e.g. memory) in bytes

  • Reports percentiles

  • Simply logs strings and is not terminal-interactive

While tqdm is useful for notebooks, ThruputObserver seeks to be more useful for longer-running batch jobs.

observe(n=0, num_bytes=0)[source]

NB: contextmanagers appear to be expensive due to object creation. Use ThurputObserver#{start,stop}_block() for <10ms ops. FMI https://stackoverflow.com/questions/34872535/why-contextmanager-is-slow

classmethod union(thruputs)[source]

Support reduction for use in e.g. MapReduce jobs as a counter

static monitoring_tensor(name, tensor, **observer_init_kwargs)[source]

Monitor the size of the given tensorflow Tensor and record a text TF Summary with the contents of this ThruputObserver.

static wrap_func(func, **observer_init_kwargs)[source]

Decorate func and observe a block on each call

oarphpy.util.tfutil

oarphpy.util.tfutil.give_me_frozen_graph(checkpoint, nodes=None, blacklist=None, base_graph=None, sess=None, saver=None)[source]

Tensorflow has several ways to load checkpoints / graph artifacts. It’s impossible to know if some API is stable or if tomorrow somebody will invent something new and break everything becaus PyTorch is shiny (e.g. TF Eager). Sam Abrahams wrote a book on Tensorflow ( https://www.amazon.com/TensorFlow-Machine-Intelligence-hands–introduction-ebook/dp/B01IZ43JV4/ ) and one time couldn’t tell me definitively which API to use. What’s more is that freeze_graph.py is an optional script instead of a library module in Tensorflow. Chaos!!

So, based upon spark-dl’s strip_and_freeze_until() ( https://github.com/databricks/spark-deep-learning/blob/4daa1179f498df4627310afea291133539ce7001/python/sparkdl/graph/utils.py#L199 ), here’s a utility for getting a frozen, serializable, pyspark-friendly graph from a checkpoint artifact metagraph thingy I have no idea.

oarphpy.util.tfutil.tf_variable_summaries(var, prefix='')[source]

Create Tensorboard summaries showing basic stats of the variable var.

class oarphpy.util.tfutil.TFRecordsFileAsListOfStrings(fileobj)[source]

Friends Don’t Let Friends Use TFRecords.

This utility provides a Tensorflow-free, minimal-dependency solution for reading TFRecords from a file stream (e.g. a buffered reader) and exposes random access over the stream’s records.

Based upon:

oarphpy.util.misc

oarphpy.util.misc.get_size_of_deep(v)[source]

(Hacky) Get size of the value v in bytes. Does not rely on a more precise library like guppy or pympler. Intended for values v that contain large binary blobs.

oarphpy.util.misc.stable_hash(x)[source]

A hash of x that is stable across program runs.

Background: As of Python 3, hash() is given a fresh seed every time the interpret starts; hash codes are not stable without setting the env var PYTHONHASHSEED.

Can we just simply adjust for the seed programmatically? Note that while it is possible to get the hash seed at runtime:

Python doesn’t use the seed in an easily-inverted way:

Thus for stability (and even light portability), we leverage Python serialization to provide a key for x.

oarphpy.util.misc.ichunked(seq, n)[source]

Generate chunks of seq of size (at most) n. More efficient and less junk than itertools recipes version using izip_longest…

oarphpy.util.misc.roundrobin(*seqs)[source]

Generate a sequence pulling round-robin from each of seqs; similar to itertools.roundrobin() recipe but

  1. won’t hide nested `StopIteration`s

  2. uses a queue to reduce dynamic allocations

oarphpy.util.misc.as_row_of_constants(inst)[source]

Row-ify an object instance inst keeping only the “class-constant” attributes of inst, i.e. the members with UPPERCASE names.

>>> class Foo(object):
...   CONST = 5
...   def __init(self, x):
...     self.x = x
>>> as_row_of_constants(Foo())
OrderedDict([('CONST', 5)])
class oarphpy.util.misc.Proxy(instance)[source]

A thin wrapper around an instance that supports custom semantics.

oarphpy.util.misc.quiet()[source]

Silence stdout and stderr for any commands in this context

oarphpy.util.misc.with_cwd(path)[source]

Use a current working directory of path for this context

oarphpy.util.misc.to_png_bytes(arr)[source]

Typically used for testing; when comparing images, we need to compare actual and expected via image bytes b/c imageio does some sort of subtle color normalization and we want our fixtures to simply be user-readable PNGs.

oarphpy.util.misc.to_jpeg_bytes(arr, quality=100)[source]

Given a numpy array image arr, return the image encoded as a jpeg buffer.

oarphpy.util.misc.get_jpeg_size(jpeg_bytes)[source]

Get the size of a JPEG image without reading and decompressing the entire file. Based upon:

oarphpy.util.misc.download(uri, dest, try_expand=True)[source]

Fetch uri, which is a file or archive, and put in dest, which is either a destination file path or destination directory.

Extra

Indices and tables