Source code for oio.cli.events.events
# Copyright (C) 2015-2019 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from logging import getLogger
from oio.cli import ShowOne
[docs]class StatsEvents(ShowOne):
"""Stats events"""
log = getLogger(__name__ + '.StatsEvents')
[docs] def get_parser(self, prog_name):
parser = super(StatsEvents, self).get_parser(prog_name)
parser.add_argument(
'--tube',
metavar='<tube>',
help='Tube name')
return parser
[docs] def take_action(self, parsed_args):
self.log.debug('take_action(%s)', parsed_args)
data = self.app.client_manager.event.stats(parsed_args.tube)
return list(zip(*sorted(data.items())))
[docs]class EventsExhume(ShowOne):
"""
Exhume (replay) events that have been buried.
"""
log = getLogger(__name__ + '.EventsExhume')
[docs] def get_parser(self, prog_name):
parser = super(EventsExhume, self).get_parser(prog_name)
parser.add_argument(
'--tube',
default="oio",
help='Name of the tube to interact with (defaults to "oio")')
parser.add_argument(
'--limit',
type=int,
default=1000,
help='Maximum number of events to exhume (defaults to 1000)')
return parser
[docs] def take_action(self, parsed_args):
self.log.debug('take_action(%s)', parsed_args)
count = self.app.client_manager.event.exhume(parsed_args.limit,
parsed_args.tube)
return [("Exhumed",), (count,)]