Skip to content

Commit

Permalink
Merge branch 'master' into 8.x
Browse files Browse the repository at this point in the history
  • Loading branch information
untergeek committed Oct 26, 2024
2 parents 15234e4 + d80a7a5 commit a40706f
Show file tree
Hide file tree
Showing 17 changed files with 2,017 additions and 768 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ localhost.es
oneliners.py
cacert.pem
docs/
docker_test/.env
docker_test/repo/
docker_test/curatortestenv
docker_test/scripts/Dockerfile
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# syntax=docker/dockerfile:1
ARG PYVER=3.11.9
ARG ALPTAG=3.19
ARG ALPTAG=3.20
FROM python:${PYVER}-alpine${ALPTAG} as builder

# Add the community repo for access to patchelf binary package
Expand Down
2 changes: 1 addition & 1 deletion curator/_version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
"""Curator Version"""

__version__ = '8.0.16'
__version__ = '8.0.17'
298 changes: 192 additions & 106 deletions curator/actions/shrink.py

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions curator/defaults/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
CLICK_DRYRUN = {
'dry-run': {'help': 'Do not perform any changes.', 'is_flag': True},
}
DATA_NODE_ROLES = ['data', 'data_content', 'data_hot', 'data_warm']

# Click specifics

Expand Down
6 changes: 5 additions & 1 deletion curator/helpers/date_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,15 @@ def get_epoch(self, searchme):
:returns: The epoch timestamp extracted from ``searchme`` by regex matching
against :py:attr:`pattern`
:rtype: int
:rtype: int or None
"""
match = self.pattern.search(searchme)
if match:
if match.group("date"):
timestamp = match.group("date")
return datetime_to_epoch(get_datetime(timestamp, self.timestring))
return None
return None


def absolute_date_range(
Expand Down Expand Up @@ -161,6 +163,8 @@ def date_range(unit, range_from, range_to, epoch=None, week_starts_on='sunday'):
:rtype: tuple
"""
logger = logging.getLogger(__name__)
start_date = None
start_delta = None
acceptable_units = ['hours', 'days', 'weeks', 'months', 'years']
if unit not in acceptable_units:
raise ConfigurationError(f'"unit" must be one of: {acceptable_units}')
Expand Down
127 changes: 95 additions & 32 deletions curator/helpers/getters.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
"""Utility functions that get things"""

import logging
import re
from elasticsearch8 import exceptions as es8exc
from curator.exceptions import (
ConfigurationError, CuratorException, FailedExecution, MissingArgument)
ConfigurationError,
CuratorException,
FailedExecution,
MissingArgument,
)


def byte_size(num, suffix='B'):
"""
Expand All @@ -23,6 +28,23 @@ def byte_size(num, suffix='B'):
num /= 1024.0
return f'{num:.1f}Y{suffix}'


def escape_dots(stringval):
"""
Escape any dots (periods) in ``stringval``.
Primarily used for ``filter_path`` where dots are indicators of path nesting
:param stringval: A string, ostensibly an index name
:type stringval: str
:returns: ``stringval``, but with any periods escaped with a backslash
:retval: str
"""
return stringval.replace('.', r'\.')


def get_alias_actions(oldidx, newidx, aliases):
"""
:param oldidx: The old index name
Expand All @@ -34,7 +56,8 @@ def get_alias_actions(oldidx, newidx, aliases):
:type aliases: dict
:returns: A list of actions suitable for
:py:meth:`~.elasticsearch.client.IndicesClient.update_aliases` ``actions`` kwarg.
:py:meth:`~.elasticsearch.client.IndicesClient.update_aliases` ``actions``
kwarg.
:rtype: list
"""
actions = []
Expand All @@ -43,29 +66,40 @@ def get_alias_actions(oldidx, newidx, aliases):
actions.append({'add': {'index': newidx, 'alias': alias}})
return actions


def get_data_tiers(client):
"""
Get all valid data tiers from the node roles of each node in the cluster by polling each node
Get all valid data tiers from the node roles of each node in the cluster by
polling each node
:param client: A client connection object
:type client: :py:class:`~.elasticsearch.Elasticsearch`
:returns: The available data tiers in ``tier: bool`` form.
:rtype: dict
"""

def role_check(role, node_info):
if role in node_info['roles']:
return True
return False

info = client.nodes.info()['nodes']
retval = {'data_hot': False, 'data_warm': False, 'data_cold': False, 'data_frozen': False}
retval = {
'data_hot': False,
'data_warm': False,
'data_cold': False,
'data_frozen': False,
}
for node in info:
for role in ['data_hot', 'data_warm', 'data_cold', 'data_frozen']:
# This guarantees we don't overwrite a True with a False. We only add True values
# This guarantees we don't overwrite a True with a False.
# We only add True values
if role_check(role, info[node]):
retval[role] = True
return retval


def get_indices(client, search_pattern='_all'):
"""
Calls :py:meth:`~.elasticsearch.client.CatClient.indices`
Expand All @@ -79,10 +113,14 @@ def get_indices(client, search_pattern='_all'):
logger = logging.getLogger(__name__)
indices = []
try:
# Doing this in two stages because IndexList also calls for these args, and the unit tests
# need to Mock this call the same exact way.
# Doing this in two stages because IndexList also calls for these args,
# and the unit tests need to Mock this call the same exact way.
resp = client.cat.indices(
index=search_pattern, expand_wildcards='open,closed', h='index,status', format='json')
index=search_pattern,
expand_wildcards='open,closed',
h='index,status',
format='json',
)
except Exception as err:
raise FailedExecution(f'Failed to get indices. Error: {err}') from err
if not resp:
Expand All @@ -92,6 +130,7 @@ def get_indices(client, search_pattern='_all'):
logger.debug('All indices: %s', indices)
return indices


def get_repository(client, repository=''):
"""
Calls :py:meth:`~.elasticsearch.client.SnapshotClient.get_repository`
Expand All @@ -114,6 +153,7 @@ def get_repository(client, repository=''):
)
raise CuratorException(msg) from err


def get_snapshot(client, repository=None, snapshot=''):
"""
Calls :py:meth:`~.elasticsearch.client.SnapshotClient.get`
Expand All @@ -126,9 +166,10 @@ def get_snapshot(client, repository=None, snapshot=''):
:type repository: str
:type snapshot: str
:returns: Information about the provided ``snapshot``, a snapshot (or a comma-separated list of
snapshots). If no snapshot specified, it will collect info for all snapshots. If none
exist, an empty :py:class:`dict` will be returned.
:returns: Information about the provided ``snapshot``, a snapshot (or a
comma-separated list of snapshots). If no snapshot specified, it will
collect info for all snapshots. If none exist, an empty :py:class:`dict`
will be returned.
:rtype: dict
"""
if not repository:
Expand All @@ -143,6 +184,7 @@ def get_snapshot(client, repository=None, snapshot=''):
)
raise FailedExecution(msg) from err


def get_snapshot_data(client, repository=None):
"""
Get all snapshots from repository and return a list.
Expand All @@ -168,6 +210,7 @@ def get_snapshot_data(client, repository=None):
)
raise FailedExecution(msg) from err


def get_tier_preference(client, target_tier='data_frozen'):
"""Do the tier preference thing in reverse order from coldest to hottest
Based on the value of ``target_tier``, build out the list to use.
Expand All @@ -194,8 +237,8 @@ def get_tier_preference(client, target_tier='data_frozen'):
if tier in tiers and tiermap[tier] <= tiermap[target_tier]:
test_list.insert(0, tier)
if target_tier == 'data_frozen':
# We're migrating to frozen here. If a frozen tier exists, frozen searchable snapshot
# mounts should only ever go to the frozen tier.
# We're migrating to frozen here. If a frozen tier exists, frozen searchable
# snapshot mounts should only ever go to the frozen tier.
if 'data_frozen' in tiers and tiers['data_frozen']:
return 'data_frozen'
# If there are no nodes with the 'data_frozen' role...
Expand All @@ -207,9 +250,11 @@ def get_tier_preference(client, target_tier='data_frozen'):
# If all of these are false, then we have no data tiers and must use 'data_content'
if not preflist:
return 'data_content'
# This will join from coldest to hottest as csv string, e.g. 'data_cold,data_warm,data_hot'
# This will join from coldest to hottest as csv string,
# e.g. 'data_cold,data_warm,data_hot'
return ','.join(preflist)


def get_write_index(client, alias):
"""
Calls :py:meth:`~.elasticsearch.client.IndicesClient.get_alias`
Expand All @@ -220,7 +265,8 @@ def get_write_index(client, alias):
:type client: :py:class:`~.elasticsearch.Elasticsearch`
:type alias: str
:returns: The the index name associated with the alias that is designated ``is_write_index``
:returns: The the index name associated with the alias that is designated
``is_write_index``
:rtype: str
"""
try:
Expand All @@ -229,17 +275,21 @@ def get_write_index(client, alias):
raise CuratorException(f'Alias {alias} not found') from exc
# If there are more than one in the list, one needs to be the write index
# otherwise the alias is a one to many, and can't do rollover.
retval = None
if len(list(response.keys())) > 1:
for index in list(response.keys()):
try:
if response[index]['aliases'][alias]['is_write_index']:
return index
retval = index
except KeyError as exc:
raise FailedExecution(
'Invalid alias: is_write_index not found in 1 to many alias') from exc
'Invalid alias: is_write_index not found in 1 to many alias'
) from exc
else:
# There's only one, so this is it
return list(response.keys())[0]
retval = list(response.keys())[0]
return retval


def index_size(client, idx, value='total'):
"""
Expand All @@ -256,7 +306,11 @@ def index_size(client, idx, value='total'):
:returns: The sum of either ``primaries`` or ``total`` shards for index ``idx``
:rtype: integer
"""
return client.indices.stats(index=idx)['indices'][idx][value]['store']['size_in_bytes']
fpath = f'indices.{escape_dots(idx)}.{value}.store.size_in_bytes'
return client.indices.stats(index=idx, filter_path=fpath)['indices'][idx][value][
'store'
]['size_in_bytes']


def meta_getter(client, idx, get=None):
"""Meta Getter
Expand Down Expand Up @@ -297,9 +351,10 @@ def meta_getter(client, idx, get=None):
logger.error('Exception encountered: %s', exc)
return retval


def name_to_node_id(client, name):
"""
Calls :py:meth:`~.elasticsearch.client.NodesClient.stats`
Calls :py:meth:`~.elasticsearch.client.NodesClient.info`
:param client: A client connection object
:param name: The node ``name``
Expand All @@ -311,17 +366,19 @@ def name_to_node_id(client, name):
:rtype: str
"""
logger = logging.getLogger(__name__)
stats = client.nodes.stats()
for node in stats['nodes']:
if stats['nodes'][node]['name'] == name:
fpath = 'nodes'
info = client.nodes.info(filter_path=fpath)
for node in info['nodes']:
if info['nodes'][node]['name'] == name:
logger.debug('Found node_id "%s" for name "%s".', node, name)
return node
logger.error('No node_id found matching name: "%s"', name)
return None


def node_id_to_name(client, node_id):
"""
Calls :py:meth:`~.elasticsearch.client.NodesClient.stats`
Calls :py:meth:`~.elasticsearch.client.NodesClient.info`
:param client: A client connection object
:param node_id: The node ``node_id``
Expand All @@ -333,15 +390,17 @@ def node_id_to_name(client, node_id):
:rtype: str
"""
logger = logging.getLogger(__name__)
stats = client.nodes.stats()
fpath = f'nodes.{node_id}.name'
info = client.nodes.info(filter_path=fpath)
name = None
if node_id in stats['nodes']:
name = stats['nodes'][node_id]['name']
if node_id in info['nodes']:
name = info['nodes'][node_id]['name']
else:
logger.error('No node_id found matching: "%s"', node_id)
logger.debug('Name associated with node_id "%s": %s', node_id, name)
return name


def node_roles(client, node_id):
"""
Calls :py:meth:`~.elasticsearch.client.NodesClient.info`
Expand All @@ -355,12 +414,14 @@ def node_roles(client, node_id):
:returns: The list of roles assigned to the node identified by ``node_id``
:rtype: list
"""
return client.nodes.info()['nodes'][node_id]['roles']
fpath = f'nodes.{node_id}.roles'
return client.nodes.info(filter_path=fpath)['nodes'][node_id]['roles']


def single_data_path(client, node_id):
"""
In order for a shrink to work, it should be on a single filesystem, as shards cannot span
filesystems. Calls :py:meth:`~.elasticsearch.client.NodesClient.stats`
In order for a shrink to work, it should be on a single filesystem, as shards
cannot span filesystems. Calls :py:meth:`~.elasticsearch.client.NodesClient.stats`
:param client: A client connection object
:param node_id: The node ``node_id``
Expand All @@ -371,4 +432,6 @@ def single_data_path(client, node_id):
:returns: ``True`` if the node has a single filesystem, else ``False``
:rtype: bool
"""
return len(client.nodes.stats()['nodes'][node_id]['fs']['data']) == 1
fpath = f'nodes.{node_id}.fs.data'
response = client.nodes.stats(filter_path=fpath)
return len(response['nodes'][node_id]['fs']['data']) == 1
Loading

0 comments on commit a40706f

Please sign in to comment.