# Copyright (C) 2019 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from oio.cli import Lister, ShowOne, flat_dict_from_dict
from oio.cli.admin.xcute import XcuteCommand
[docs]class JobList(XcuteCommand, Lister):
"""
List all jobs.
"""
columns = ('ID', 'Status', 'Type', 'ctime', 'mtime')
def _take_action(self, parsed_args):
jobs = self.xcute.job_list()
for job_info in jobs:
job_main_info = job_info['job']
yield (job_main_info['id'], job_main_info['status'],
job_main_info['type'], job_main_info['ctime'],
job_main_info['mtime'])
[docs] def take_action(self, parsed_args):
self.logger.debug('take_action(%s)', parsed_args)
return self.columns, self._take_action(parsed_args)
[docs]class JobShow(XcuteCommand, ShowOne):
"""
Get all information about the job.
"""
[docs] def get_parser(self, prog_name):
parser = super(JobShow, self).get_parser(prog_name)
parser.add_argument(
'job_id',
metavar='<job_id>',
help=("Job ID to show"))
parser.add_argument(
'--raw',
action='store_true',
help='Display raw information')
return parser
[docs] def take_action(self, parsed_args):
self.logger.debug('take_action(%s)', parsed_args)
job_info = self.xcute.job_show(parsed_args.job_id)
if not parsed_args.raw:
job_main_info = job_info['job']
duration = job_main_info['mtime'] - job_main_info['ctime']
job_main_info['duration'] = duration
job_tasks = job_info['tasks']
job_tasks['sent_percent'] = \
job_tasks['sent'] * 100. / (job_tasks['total'] or 0.00001)
job_tasks['processed_per_second'] = \
job_tasks['processed'] / (duration or 0.00001)
job_tasks['processed_percent'] = \
job_tasks['processed'] * 100. / (job_tasks['total'] or 0.00001)
if parsed_args.formatter == 'table':
if not job_tasks['all_sent']:
if job_tasks['is_total_temp']:
total_state = 'estimating'
else:
total_state = 'estimated'
job_tasks['total'] = "%d (%s)" % (
job_tasks['total'], total_state)
job_info.pop('orchestrator', None)
job_main_info.pop('request_pause', None)
job_tasks.pop('all_sent', None)
job_tasks.pop('last_sent', None)
job_tasks.pop('is_total_temp', None)
job_tasks.pop('total_marker', None)
return zip(*sorted(
flat_dict_from_dict(parsed_args, job_info).items()))
[docs]class JobPause(XcuteCommand, Lister):
"""
Pause the jobs.
"""
columns = ('ID', 'Paused')
[docs] def get_parser(self, prog_name):
parser = super(JobPause, self).get_parser(prog_name)
parser.add_argument(
'job_ids',
nargs='+',
metavar='<job_id>',
help=("Job IDs to pause"))
return parser
def _take_action(self, parsed_args):
for job_id in parsed_args.job_ids:
paused = True
try:
self.xcute.job_pause(job_id)
except Exception as exc:
self.logger.error('Failed to paused job %s: %s',
job_id, exc)
paused = False
yield (job_id, paused)
[docs] def take_action(self, parsed_args):
self.logger.debug('take_action(%s)', parsed_args)
return self.columns, self._take_action(parsed_args)
[docs]class JobResume(XcuteCommand, Lister):
"""
Resume the jobs.
"""
columns = ('ID', 'Resumed')
[docs] def get_parser(self, prog_name):
parser = super(JobResume, self).get_parser(prog_name)
parser.add_argument(
'job_ids',
nargs='+',
metavar='<job_id>',
help=("Job IDs to resume"))
return parser
def _take_action(self, parsed_args):
for job_id in parsed_args.job_ids:
resumed = True
try:
self.xcute.job_resume(job_id)
except Exception as exc:
self.logger.error('Failed to resumed job %s: %s',
job_id, exc)
resumed = False
yield (job_id, resumed)
[docs] def take_action(self, parsed_args):
self.logger.debug('take_action(%s)', parsed_args)
return self.columns, self._take_action(parsed_args)
[docs]class JobDelete(XcuteCommand, Lister):
"""
Delete all information about the jobs.
"""
columns = ('ID', 'Deleted')
[docs] def get_parser(self, prog_name):
parser = super(JobDelete, self).get_parser(prog_name)
parser.add_argument(
'job_ids',
nargs='+',
metavar='<job_id>',
help=("Job IDs to delete"))
return parser
def _take_action(self, parsed_args):
for job_id in parsed_args.job_ids:
deleted = True
try:
self.xcute.job_delete(job_id)
except Exception as exc:
self.logger.error('Failed to deleted job %s: %s',
job_id, exc)
deleted = False
yield (job_id, deleted)
[docs] def take_action(self, parsed_args):
self.logger.debug('take_action(%s)', parsed_args)
return self.columns, self._take_action(parsed_args)