#
# Copyright (c) 2015, ParaTools, Inc.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
# (1) Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# (2) Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# (3) Neither the name of ParaTools, Inc. nor the names of its contributors may
# be used to endorse or promote products derived from this software without
# specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
"""Draw progress indicators on the console.
Show bars or spinners, possibly with instantaneous CPU load average.
"""
import os
import sys
import threading
import itertools
from datetime import datetime, timedelta
from taucmdr import logger
from taucmdr.error import ConfigurationError
LOGGER = logger.get_logger(__name__)
def _read_proc_stat_cpu():
with open('/proc/stat') as fin:
cpu_line = fin.readline()
values = (float(x) for x in cpu_line.split()[1:])
fields = 'user', 'nice', 'sys', 'idle', 'iowait', 'irq', 'sirq'
return dict(list(zip(fields, values)))
def _proc_stat_cpu_load_average():
if not hasattr(_proc_stat_cpu_load_average, 'prev'):
_proc_stat_cpu_load_average.prev = _read_proc_stat_cpu()
prev = _proc_stat_cpu_load_average.prev
cur = _read_proc_stat_cpu()
if prev and cur:
prev_idle = prev['idle'] + prev['iowait']
cur_idle = cur['idle'] + cur['iowait']
prev_total = sum(prev.values())
cur_total = sum(cur.values())
diff_total = cur_total - prev_total
diff_idle = cur_idle - prev_idle
_proc_stat_cpu_load_average.prev = cur
if diff_total:
return (diff_total - diff_idle) / diff_total
return 0.0
[docs]def load_average():
"""Calculate the CPU load average.
Returns:
float: Load average since last time this routine was called
or None if couldn't calculate load average.
"""
try:
cpu_load_avg = _proc_stat_cpu_load_average()
except OSError:
cpu_load_avg = None
return cpu_load_avg
[docs]class ProgressIndicator:
"""A fancy progress indicator to entertain antsy users."""
_spinner = itertools.cycle(['-', '\\', '|', '/'])
_indent = ' '
def __init__(self, label, total_size=0, block_size=1, show_cpu=True, auto_refresh=0.25):
mode = os.environ.get('__TAUCMDR_PROGRESS_BARS__', 'full').lower()
if mode not in ('full', 'disabled'):
raise ConfigurationError('Invalid value for __TAUCMDR_PROGRESS_BARS__ environment variable: %s' % mode)
self.label = label
self.count = 0
self.total_size = total_size
self.block_size = block_size
self.show_cpu = show_cpu if load_average() is not None else False
self.auto_refresh = auto_refresh if mode != 'disabled' else 0
self._mode = mode
self._line_remaining = 0
self._phases = []
self._phase_count = 0
self._phase_depth = 0
self._phase_base = 0
self._thread = None
self._exiting = None
self._updating = None
def _thread_progress(self):
while not self._exiting.wait(self.auto_refresh):
self._updating.acquire()
self.update()
self._updating.notify()
self._updating.release()
def __enter__(self):
self.push_phase(self.label)
return self
def __exit__(self, unused_exc_type, unused_exc_value, unused_traceback):
self.complete()
return False
def _line_reset(self):
sys.stdout.write('\r')
sys.stdout.write(logger.COLORED_LINE_MARKER)
self._line_remaining = logger.LINE_WIDTH
def _line_append(self, text):
from taucmdr import util
sys.stdout.write(text)
self._line_remaining -= len(util.uncolor_text(text))
def _line_flush(self, newline=False):
self._line_append(' '*self._line_remaining)
if newline:
sys.stdout.write('\n')
sys.stdout.flush()
assert self._line_remaining == 0, str(self._line_remaining)
def _draw_bar(self, percent, width, char, *args, **kwargs):
from taucmdr import util
bar_on = max(int(percent*width), 1)
bar_off = width - bar_on
self._line_append(util.color_text(char*bar_on, *args, **kwargs))
self._line_append(' '*bar_off)
def _draw_phase_labels(self):
start = self._phase_base
printed_phases = self._phases[:start]
for i, (label, timestamp, implicit) in enumerate(self._phases[start:-1], start):
if label is not None:
if self._phases[i+1][0] is not None:
self._line_reset()
self._line_append("%s:" % label)
self._line_flush(newline=True)
printed_phases.append((label, timestamp, implicit))
else:
label, tstart, _ = printed_phases.pop()
tdelta = (timestamp - tstart).total_seconds()
self._line_reset()
self._line_append(f"{label} [{tdelta:0.3f} seconds]")
self._line_flush(newline=True)
label, timestamp, implicit = self._phases[-1]
if label is not None:
printed_phases.append((label, timestamp, implicit))
else:
label, tstart, _ = printed_phases.pop()
tdelta = (timestamp - tstart).total_seconds()
self._line_reset()
self._line_append(f"{label} [{tdelta:0.3f} seconds]")
self._line_flush(newline=True)
self._phases = printed_phases
self._phase_depth = len(printed_phases)
self._phase_base = max(self._phase_base, self._phase_depth-1)
[docs] def push_phase(self, label, implicit=False):
if self.auto_refresh:
if self._thread is None:
self._thread = threading.Thread(target=self._thread_progress)
self._exiting = threading.Event()
self._updating = threading.Condition()
self._thread.daemon = True
self._thread.start()
self._updating.acquire()
try:
top_phase = self._phases[-1]
except IndexError:
new_phase = True
else:
new_phase = top_phase[0] is not None and top_phase[0].strip() != label
if top_phase[2]:
self.pop_phase()
if new_phase:
label = (self._phase_depth*self._indent) + label
self._phases.append((label, datetime.now(), implicit))
if self.auto_refresh:
self._updating.wait()
self._updating.release()
else:
self.update()
[docs] def pop_phase(self):
if self.auto_refresh:
self._updating.acquire()
if self._phases:
self._phases.append((None, datetime.now(), None))
if self.auto_refresh:
self._updating.wait()
self._updating.release()
else:
self.update()
[docs] def phase(self, label):
self.push_phase(label, True)
[docs] def increment(self, count=1):
self.count += count
[docs] def update(self, count=None, block_size=None, total_size=None):
"""Show progress.
Updates `block_size` or `total_size` if given for compatibility with :any:`urllib.urlretrieve`.
Args:
count (int): Number of blocks of `block_size` that have been completed.
block_size (int): Size of a work block.
total_size (int): Total amount of work to be completed.
"""
if count is not None:
self.count = count
if block_size is not None:
self.block_size = block_size
if total_size is not None:
self.total_size = total_size
if self.auto_refresh:
if threading.current_thread() is not self._thread:
if not self._phases:
self.push_phase(self.label)
return
else:
if not self._phases:
self.push_phase(self.label)
return
if self._phase_depth != len(self._phases):
self._draw_phase_labels()
if not self._phases:
return
label, tstart, _ = self._phases[-1]
tdelta = (datetime.now() - tstart).total_seconds()
self._line_reset()
if label == "":
self._line_append("{:0.1f} seconds {}".format(tdelta, next(self._spinner)))
else:
self._line_append("{}: {:0.1f} seconds {}".format(label, tdelta, next(self._spinner)))
show_bar = self.total_size > 0
if self.show_cpu and self._line_remaining > 40:
cpu_load = min(load_average(), 1.0)
self._line_append("[CPU: %0.1f " % (100*cpu_load))
width = (self._line_remaining//4) if show_bar else (self._line_remaining-2)
self._draw_bar(cpu_load, width, '|', 'white', 'on_white')
self._line_append("]")
if show_bar and self._line_remaining > 20:
self._line_append(" ")
completed = float(self.count*self.block_size)
percent = max(min(completed / self.total_size, 1.0), 0.0)
self._line_append("[%0.1f%% " % (100*percent))
if completed == 0:
eta = '(unknown)'
else:
time_remaining = (tdelta / completed) * (self.total_size - completed)
etadate = datetime.now() + timedelta(seconds=time_remaining)
eta = '%s-%s-%s %02d:%02d' % (
etadate.year, etadate.month, etadate.day, etadate.hour, etadate.minute)
width = self._line_remaining - 4 - len(eta)
self._draw_bar(percent, width, '>', 'green', 'on_green')
self._line_append("] %s" % eta)
self._line_flush()
[docs] def complete(self):
active = len(self._phases)
for _ in range(active):
self.pop_phase()
if self.auto_refresh:
self._exiting.set()
self._thread.join()
else:
self.update()