# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # cython: language_level = 3 import sys from cpython.object cimport Py_LT, Py_EQ, Py_GT, Py_LE, Py_NE, Py_GE from cython.operator cimport dereference as deref from collections import namedtuple from pyarrow.lib import frombytes, tobytes, ordered_dict, ArrowInvalid from pyarrow.lib cimport * from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * import pyarrow.lib as lib from libcpp cimport bool as c_bool import inspect import numpy as np cdef wrap_scalar_function(const shared_ptr[CFunction]& sp_func): """ Wrap a C++ scalar Function in a ScalarFunction object. """ cdef ScalarFunction func = ScalarFunction.__new__(ScalarFunction) func.init(sp_func) return func cdef wrap_vector_function(const shared_ptr[CFunction]& sp_func): """ Wrap a C++ vector Function in a VectorFunction object. """ cdef VectorFunction func = VectorFunction.__new__(VectorFunction) func.init(sp_func) return func cdef wrap_scalar_aggregate_function(const shared_ptr[CFunction]& sp_func): """ Wrap a C++ aggregate Function in a ScalarAggregateFunction object. """ cdef ScalarAggregateFunction func = \ ScalarAggregateFunction.__new__(ScalarAggregateFunction) func.init(sp_func) return func cdef wrap_hash_aggregate_function(const shared_ptr[CFunction]& sp_func): """ Wrap a C++ aggregate Function in a HashAggregateFunction object. """ cdef HashAggregateFunction func = \ HashAggregateFunction.__new__(HashAggregateFunction) func.init(sp_func) return func cdef wrap_meta_function(const shared_ptr[CFunction]& sp_func): """ Wrap a C++ meta Function in a MetaFunction object. """ cdef MetaFunction func = MetaFunction.__new__(MetaFunction) func.init(sp_func) return func cdef wrap_function(const shared_ptr[CFunction]& sp_func): """ Wrap a C++ Function in a Function object. This dispatches to specialized wrappers depending on the function kind. """ if sp_func.get() == NULL: raise ValueError("Function was NULL") cdef FunctionKind c_kind = sp_func.get().kind() if c_kind == FunctionKind_SCALAR: return wrap_scalar_function(sp_func) elif c_kind == FunctionKind_VECTOR: return wrap_vector_function(sp_func) elif c_kind == FunctionKind_SCALAR_AGGREGATE: return wrap_scalar_aggregate_function(sp_func) elif c_kind == FunctionKind_HASH_AGGREGATE: return wrap_hash_aggregate_function(sp_func) elif c_kind == FunctionKind_META: return wrap_meta_function(sp_func) else: raise NotImplementedError("Unknown Function::Kind") cdef wrap_scalar_kernel(const CScalarKernel* c_kernel): if c_kernel == NULL: raise ValueError("Kernel was NULL") cdef ScalarKernel kernel = ScalarKernel.__new__(ScalarKernel) kernel.init(c_kernel) return kernel cdef wrap_vector_kernel(const CVectorKernel* c_kernel): if c_kernel == NULL: raise ValueError("Kernel was NULL") cdef VectorKernel kernel = VectorKernel.__new__(VectorKernel) kernel.init(c_kernel) return kernel cdef wrap_scalar_aggregate_kernel(const CScalarAggregateKernel* c_kernel): if c_kernel == NULL: raise ValueError("Kernel was NULL") cdef ScalarAggregateKernel kernel = \ ScalarAggregateKernel.__new__(ScalarAggregateKernel) kernel.init(c_kernel) return kernel cdef wrap_hash_aggregate_kernel(const CHashAggregateKernel* c_kernel): if c_kernel == NULL: raise ValueError("Kernel was NULL") cdef HashAggregateKernel kernel = \ HashAggregateKernel.__new__(HashAggregateKernel) kernel.init(c_kernel) return kernel cdef class Kernel(_Weakrefable): """ A kernel object. Kernels handle the execution of a Function for a certain signature. """ def __init__(self): raise TypeError("Do not call {}'s constructor directly" .format(self.__class__.__name__)) cdef class ScalarKernel(Kernel): cdef const CScalarKernel* kernel cdef void init(self, const CScalarKernel* kernel) except *: self.kernel = kernel def __repr__(self): return ("ScalarKernel<{}>" .format(frombytes(self.kernel.signature.get().ToString()))) cdef class VectorKernel(Kernel): cdef const CVectorKernel* kernel cdef void init(self, const CVectorKernel* kernel) except *: self.kernel = kernel def __repr__(self): return ("VectorKernel<{}>" .format(frombytes(self.kernel.signature.get().ToString()))) cdef class ScalarAggregateKernel(Kernel): cdef const CScalarAggregateKernel* kernel cdef void init(self, const CScalarAggregateKernel* kernel) except *: self.kernel = kernel def __repr__(self): return ("ScalarAggregateKernel<{}>" .format(frombytes(self.kernel.signature.get().ToString()))) cdef class HashAggregateKernel(Kernel): cdef const CHashAggregateKernel* kernel cdef void init(self, const CHashAggregateKernel* kernel) except *: self.kernel = kernel def __repr__(self): return ("HashAggregateKernel<{}>" .format(frombytes(self.kernel.signature.get().ToString()))) FunctionDoc = namedtuple( "FunctionDoc", ("summary", "description", "arg_names", "options_class", "options_required")) cdef class Function(_Weakrefable): """ A compute function. A function implements a certain logical computation over a range of possible input signatures. Each signature accepts a range of input types and is implemented by a given Kernel. Functions can be of different kinds: * "scalar" functions apply an item-wise computation over all items of their inputs. Each item in the output only depends on the values of the inputs at the same position. Examples: addition, comparisons, string predicates... * "vector" functions apply a collection-wise computation, such that each item in the output may depend on the values of several items in each input. Examples: dictionary encoding, sorting, extracting unique values... * "scalar_aggregate" functions reduce the dimensionality of the inputs by applying a reduction function. Examples: sum, min_max, mode... * "hash_aggregate" functions apply a reduction function to an input subdivided by grouping criteria. They may not be directly called. Examples: hash_sum, hash_min_max... * "meta" functions dispatch to other functions. """ cdef: shared_ptr[CFunction] sp_func CFunction* base_func _kind_map = { FunctionKind_SCALAR: "scalar", FunctionKind_VECTOR: "vector", FunctionKind_SCALAR_AGGREGATE: "scalar_aggregate", FunctionKind_HASH_AGGREGATE: "hash_aggregate", FunctionKind_META: "meta", } def __init__(self): raise TypeError("Do not call {}'s constructor directly" .format(self.__class__.__name__)) cdef void init(self, const shared_ptr[CFunction]& sp_func) except *: self.sp_func = sp_func self.base_func = sp_func.get() def __repr__(self): return ("arrow.compute.Function" .format(self.name, self.kind, self.arity, self.num_kernels)) def __reduce__(self): # Reduction uses the global registry return get_function, (self.name,) @property def name(self): """ The function name. """ return frombytes(self.base_func.name()) @property def arity(self): """ The function arity. If Ellipsis (i.e. `...`) is returned, the function takes a variable number of arguments. """ cdef CArity arity = self.base_func.arity() if arity.is_varargs: return ... else: return arity.num_args @property def kind(self): """ The function kind. """ cdef FunctionKind c_kind = self.base_func.kind() try: return self._kind_map[c_kind] except KeyError: raise NotImplementedError("Unknown Function::Kind") @property def _doc(self): """ The C++-like function documentation (for internal use). """ cdef CFunctionDoc c_doc = self.base_func.doc() return FunctionDoc(frombytes(c_doc.summary), frombytes(c_doc.description), [frombytes(s) for s in c_doc.arg_names], frombytes(c_doc.options_class), c_doc.options_required) @property def num_kernels(self): """ The number of kernels implementing this function. """ return self.base_func.num_kernels() def call(self, args, FunctionOptions options=None, MemoryPool memory_pool=None, length=None): """ Call the function on the given arguments. Parameters ---------- args : iterable The arguments to pass to the function. Accepted types depend on the specific function. options : FunctionOptions, optional Options instance for executing this function. This should have the right concrete options type. memory_pool : pyarrow.MemoryPool, optional If not passed, will allocate memory from the default memory pool. length : int, optional Batch size for execution, for nullary (no argument) functions. If not passed, will be inferred from passed data. """ cdef: const CFunctionOptions* c_options = NULL CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) CExecContext c_exec_ctx = CExecContext(pool) CExecBatch c_batch CDatum result _pack_compute_args(args, &c_batch.values) if options is not None: c_options = options.get_options() if length is not None: c_batch.length = length with nogil: result = GetResultValue( self.base_func.Execute(c_batch, c_options, &c_exec_ctx) ) else: with nogil: result = GetResultValue( self.base_func.Execute(c_batch.values, c_options, &c_exec_ctx) ) return wrap_datum(result) cdef class ScalarFunction(Function): cdef const CScalarFunction* func cdef void init(self, const shared_ptr[CFunction]& sp_func) except *: Function.init(self, sp_func) self.func = sp_func.get() @property def kernels(self): """ The kernels implementing this function. """ cdef vector[const CScalarKernel*] kernels = self.func.kernels() return [wrap_scalar_kernel(k) for k in kernels] cdef class VectorFunction(Function): cdef const CVectorFunction* func cdef void init(self, const shared_ptr[CFunction]& sp_func) except *: Function.init(self, sp_func) self.func = sp_func.get() @property def kernels(self): """ The kernels implementing this function. """ cdef vector[const CVectorKernel*] kernels = self.func.kernels() return [wrap_vector_kernel(k) for k in kernels] cdef class ScalarAggregateFunction(Function): cdef const CScalarAggregateFunction* func cdef void init(self, const shared_ptr[CFunction]& sp_func) except *: Function.init(self, sp_func) self.func = sp_func.get() @property def kernels(self): """ The kernels implementing this function. """ cdef vector[const CScalarAggregateKernel*] kernels = \ self.func.kernels() return [wrap_scalar_aggregate_kernel(k) for k in kernels] cdef class HashAggregateFunction(Function): cdef const CHashAggregateFunction* func cdef void init(self, const shared_ptr[CFunction]& sp_func) except *: Function.init(self, sp_func) self.func = sp_func.get() @property def kernels(self): """ The kernels implementing this function. """ cdef vector[const CHashAggregateKernel*] kernels = self.func.kernels() return [wrap_hash_aggregate_kernel(k) for k in kernels] cdef class MetaFunction(Function): cdef const CMetaFunction* func cdef void init(self, const shared_ptr[CFunction]& sp_func) except *: Function.init(self, sp_func) self.func = sp_func.get() # Since num_kernels is exposed, also expose a kernels property @property def kernels(self): """ The kernels implementing this function. """ return [] cdef _pack_compute_args(object values, vector[CDatum]* out): for val in values: if isinstance(val, (list, np.ndarray)): val = lib.asarray(val) if isinstance(val, Array): out.push_back(CDatum(( val).sp_array)) continue elif isinstance(val, ChunkedArray): out.push_back(CDatum(( val).sp_chunked_array)) continue elif isinstance(val, Scalar): out.push_back(CDatum(( val).unwrap())) continue elif isinstance(val, RecordBatch): out.push_back(CDatum(( val).sp_batch)) continue elif isinstance(val, Table): out.push_back(CDatum(( val).sp_table)) continue else: # Is it a Python scalar? try: scal = lib.scalar(val) except Exception: # Raise dedicated error below pass else: out.push_back(CDatum(( scal).unwrap())) continue raise TypeError(f"Got unexpected argument type {type(val)} " "for compute function") cdef class FunctionRegistry(_Weakrefable): cdef CFunctionRegistry* registry def __init__(self): self.registry = GetFunctionRegistry() def list_functions(self): """ Return all function names in the registry. """ cdef vector[c_string] names = self.registry.GetFunctionNames() return [frombytes(name) for name in names] def get_function(self, name): """ Look up a function by name in the registry. Parameters ---------- name : str The name of the function to lookup """ cdef: c_string c_name = tobytes(name) shared_ptr[CFunction] func with nogil: func = GetResultValue(self.registry.GetFunction(c_name)) return wrap_function(func) cdef FunctionRegistry _global_func_registry = FunctionRegistry() def function_registry(): return _global_func_registry def get_function(name): """ Get a function by name. The function is looked up in the global registry (as returned by `function_registry()`). Parameters ---------- name : str The name of the function to lookup """ return _global_func_registry.get_function(name) def list_functions(): """ Return all function names in the global registry. """ return _global_func_registry.list_functions() def call_function(name, args, options=None, memory_pool=None, length=None): """ Call a named function. The function is looked up in the global registry (as returned by `function_registry()`). Parameters ---------- name : str The name of the function to call. args : list The arguments to the function. options : optional options provided to the function. memory_pool : MemoryPool, optional memory pool to use for allocations during function execution. length : int, optional Batch size for execution, for nullary (no argument) functions. If not passed, inferred from data. """ func = _global_func_registry.get_function(name) return func.call(args, options=options, memory_pool=memory_pool, length=length) cdef class FunctionOptions(_Weakrefable): __slots__ = () # avoid mistakingly creating attributes cdef const CFunctionOptions* get_options(self) except NULL: return self.wrapped.get() cdef void init(self, const shared_ptr[CFunctionOptions]& sp): self.wrapped = sp cdef inline shared_ptr[CFunctionOptions] unwrap(self): return self.wrapped def serialize(self): cdef: CResult[shared_ptr[CBuffer]] res = self.get_options().Serialize() shared_ptr[CBuffer] c_buf = GetResultValue(res) return pyarrow_wrap_buffer(c_buf) @staticmethod def deserialize(buf): """ Deserialize options for a function. Parameters ---------- buf : Buffer The buffer containing the data to deserialize. """ cdef: shared_ptr[CBuffer] c_buf = pyarrow_unwrap_buffer(buf) CResult[unique_ptr[CFunctionOptions]] maybe_options = \ DeserializeFunctionOptions(deref(c_buf)) shared_ptr[CFunctionOptions] c_options c_options = to_shared(GetResultValue(move(maybe_options))) type_name = frombytes(c_options.get().options_type().type_name()) module = globals() if type_name not in module: raise ValueError(f'Cannot deserialize "{type_name}"') klass = module[type_name] options = klass.__new__(klass) ( options).init(c_options) return options def __repr__(self): type_name = self.__class__.__name__ # Remove {} so we can use our own braces string_repr = frombytes(self.get_options().ToString())[1:-1] return f"{type_name}({string_repr})" def __eq__(self, FunctionOptions other): return self.get_options().Equals(deref(other.get_options())) def _raise_invalid_function_option(value, description, *, exception_class=ValueError): raise exception_class(f"\"{value}\" is not a valid {description}") # NOTE: # To properly expose the constructor signature of FunctionOptions # subclasses, we use a two-level inheritance: # 1. a C extension class that implements option validation and setting # (won't expose function signatures because of # https://github.com/cython/cython/issues/3873) # 2. a Python derived class that implements the constructor cdef class _CastOptions(FunctionOptions): cdef CCastOptions* options cdef void init(self, const shared_ptr[CFunctionOptions]& sp): FunctionOptions.init(self, sp) self.options = self.wrapped.get() def _set_options(self, DataType target_type, allow_int_overflow, allow_time_truncate, allow_time_overflow, allow_decimal_truncate, allow_float_truncate, allow_invalid_utf8): cdef: shared_ptr[CCastOptions] wrapped = make_shared[CCastOptions]() self.init( wrapped) self._set_type(target_type) if allow_int_overflow is not None: self.allow_int_overflow = allow_int_overflow if allow_time_truncate is not None: self.allow_time_truncate = allow_time_truncate if allow_time_overflow is not None: self.allow_time_overflow = allow_time_overflow if allow_decimal_truncate is not None: self.allow_decimal_truncate = allow_decimal_truncate if allow_float_truncate is not None: self.allow_float_truncate = allow_float_truncate if allow_invalid_utf8 is not None: self.allow_invalid_utf8 = allow_invalid_utf8 def _set_type(self, target_type=None): if target_type is not None: deref(self.options).to_type = \ ( ensure_type(target_type)).sp_type def _set_safe(self): self.init(shared_ptr[CFunctionOptions]( new CCastOptions(CCastOptions.Safe()))) def _set_unsafe(self): self.init(shared_ptr[CFunctionOptions]( new CCastOptions(CCastOptions.Unsafe()))) def is_safe(self): return not (deref(self.options).allow_int_overflow or deref(self.options).allow_time_truncate or deref(self.options).allow_time_overflow or deref(self.options).allow_decimal_truncate or deref(self.options).allow_float_truncate or deref(self.options).allow_invalid_utf8) @property def allow_int_overflow(self): return deref(self.options).allow_int_overflow @allow_int_overflow.setter def allow_int_overflow(self, c_bool flag): deref(self.options).allow_int_overflow = flag @property def allow_time_truncate(self): return deref(self.options).allow_time_truncate @allow_time_truncate.setter def allow_time_truncate(self, c_bool flag): deref(self.options).allow_time_truncate = flag @property def allow_time_overflow(self): return deref(self.options).allow_time_overflow @allow_time_overflow.setter def allow_time_overflow(self, c_bool flag): deref(self.options).allow_time_overflow = flag @property def allow_decimal_truncate(self): return deref(self.options).allow_decimal_truncate @allow_decimal_truncate.setter def allow_decimal_truncate(self, c_bool flag): deref(self.options).allow_decimal_truncate = flag @property def allow_float_truncate(self): return deref(self.options).allow_float_truncate @allow_float_truncate.setter def allow_float_truncate(self, c_bool flag): deref(self.options).allow_float_truncate = flag @property def allow_invalid_utf8(self): return deref(self.options).allow_invalid_utf8 @allow_invalid_utf8.setter def allow_invalid_utf8(self, c_bool flag): deref(self.options).allow_invalid_utf8 = flag class CastOptions(_CastOptions): """ Options for the `cast` function. Parameters ---------- target_type : DataType, optional The PyArrow type to cast to. allow_int_overflow : bool, default False Whether integer overflow is allowed when casting. allow_time_truncate : bool, default False Whether time precision truncation is allowed when casting. allow_time_overflow : bool, default False Whether date/time range overflow is allowed when casting. allow_decimal_truncate : bool, default False Whether decimal precision truncation is allowed when casting. allow_float_truncate : bool, default False Whether floating-point precision truncation is allowed when casting. allow_invalid_utf8 : bool, default False Whether producing invalid utf8 data is allowed when casting. """ def __init__(self, target_type=None, *, allow_int_overflow=None, allow_time_truncate=None, allow_time_overflow=None, allow_decimal_truncate=None, allow_float_truncate=None, allow_invalid_utf8=None): self._set_options(target_type, allow_int_overflow, allow_time_truncate, allow_time_overflow, allow_decimal_truncate, allow_float_truncate, allow_invalid_utf8) @staticmethod def safe(target_type=None): """" Create a CastOptions for a safe cast. Parameters ---------- target_type : optional Target cast type for the safe cast. """ self = CastOptions() self._set_safe() self._set_type(target_type) return self @staticmethod def unsafe(target_type=None): """" Create a CastOptions for an unsafe cast. Parameters ---------- target_type : optional Target cast type for the unsafe cast. """ self = CastOptions() self._set_unsafe() self._set_type(target_type) return self def _skip_nulls_doc(): # (note the weird indent because of how the string is inserted # by callers) return """skip_nulls : bool, default True Whether to skip (ignore) nulls in the input. If False, any null in the input forces the output to null. """ def _min_count_doc(*, default): return f"""min_count : int, default {default} Minimum number of non-null values in the input. If the number of non-null values is below `min_count`, the output is null. """ cdef class _ElementWiseAggregateOptions(FunctionOptions): def _set_options(self, skip_nulls): self.wrapped.reset(new CElementWiseAggregateOptions(skip_nulls)) class ElementWiseAggregateOptions(_ElementWiseAggregateOptions): __doc__ = f""" Options for element-wise aggregate functions. Parameters ---------- {_skip_nulls_doc()} """ def __init__(self, *, skip_nulls=True): self._set_options(skip_nulls) cdef CRoundMode unwrap_round_mode(round_mode) except *: if round_mode == "down": return CRoundMode_DOWN elif round_mode == "up": return CRoundMode_UP elif round_mode == "towards_zero": return CRoundMode_TOWARDS_ZERO elif round_mode == "towards_infinity": return CRoundMode_TOWARDS_INFINITY elif round_mode == "half_down": return CRoundMode_HALF_DOWN elif round_mode == "half_up": return CRoundMode_HALF_UP elif round_mode == "half_towards_zero": return CRoundMode_HALF_TOWARDS_ZERO elif round_mode == "half_towards_infinity": return CRoundMode_HALF_TOWARDS_INFINITY elif round_mode == "half_to_even": return CRoundMode_HALF_TO_EVEN elif round_mode == "half_to_odd": return CRoundMode_HALF_TO_ODD _raise_invalid_function_option(round_mode, "round mode") cdef class _RoundOptions(FunctionOptions): def _set_options(self, ndigits, round_mode): self.wrapped.reset( new CRoundOptions(ndigits, unwrap_round_mode(round_mode)) ) class RoundOptions(_RoundOptions): """ Options for rounding numbers. Parameters ---------- ndigits : int, default 0 Number of fractional digits to round to. round_mode : str, default "half_to_even" Rounding and tie-breaking mode. Accepted values are "down", "up", "towards_zero", "towards_infinity", "half_down", "half_up", "half_towards_zero", "half_towards_infinity", "half_to_even", "half_to_odd". """ def __init__(self, ndigits=0, round_mode="half_to_even"): self._set_options(ndigits, round_mode) cdef CCalendarUnit unwrap_round_temporal_unit(unit) except *: if unit == "nanosecond": return CCalendarUnit_NANOSECOND elif unit == "microsecond": return CCalendarUnit_MICROSECOND elif unit == "millisecond": return CCalendarUnit_MILLISECOND elif unit == "second": return CCalendarUnit_SECOND elif unit == "minute": return CCalendarUnit_MINUTE elif unit == "hour": return CCalendarUnit_HOUR elif unit == "day": return CCalendarUnit_DAY elif unit == "week": return CCalendarUnit_WEEK elif unit == "month": return CCalendarUnit_MONTH elif unit == "quarter": return CCalendarUnit_QUARTER elif unit == "year": return CCalendarUnit_YEAR _raise_invalid_function_option(unit, "Calendar unit") cdef class _RoundTemporalOptions(FunctionOptions): def _set_options(self, multiple, unit, week_starts_monday, ceil_is_strictly_greater, calendar_based_origin): self.wrapped.reset( new CRoundTemporalOptions( multiple, unwrap_round_temporal_unit(unit), week_starts_monday, ceil_is_strictly_greater, calendar_based_origin) ) class RoundTemporalOptions(_RoundTemporalOptions): """ Options for rounding temporal values. Parameters ---------- multiple : int, default 1 Number of units to round to. unit : str, default "day" The unit in which `multiple` is expressed. Accepted values are "year", "quarter", "month", "week", "day", "hour", "minute", "second", "millisecond", "microsecond", "nanosecond". week_starts_monday : bool, default True If True, weeks start on Monday; if False, on Sunday. ceil_is_strictly_greater : bool, default False If True, ceil returns a rounded value that is strictly greater than the input. For example: ceiling 1970-01-01T00:00:00 to 3 hours would yield 1970-01-01T03:00:00 if set to True and 1970-01-01T00:00:00 if set to False. This applies to the ceil_temporal function only. calendar_based_origin : bool, default False By default, the origin is 1970-01-01T00:00:00. By setting this to True, rounding origin will be beginning of one less precise calendar unit. E.g.: rounding to hours will use beginning of day as origin. By default time is rounded to a multiple of units since 1970-01-01T00:00:00. By setting calendar_based_origin to true, time will be rounded to number of units since the last greater calendar unit. For example: rounding to multiple of days since the beginning of the month or to hours since the beginning of the day. Exceptions: week and quarter are not used as greater units, therefore days will be rounded to the beginning of the month not week. Greater unit of week is a year. Note that ceiling and rounding might change sorting order of an array near greater unit change. For example rounding YYYY-mm-dd 23:00:00 to 5 hours will ceil and round to YYYY-mm-dd+1 01:00:00 and floor to YYYY-mm-dd 20:00:00. On the other hand YYYY-mm-dd+1 00:00:00 will ceil, round and floor to YYYY-mm-dd+1 00:00:00. This can break the order of an already ordered array. """ def __init__(self, multiple=1, unit="day", *, week_starts_monday=True, ceil_is_strictly_greater=False, calendar_based_origin=False): self._set_options(multiple, unit, week_starts_monday, ceil_is_strictly_greater, calendar_based_origin) cdef class _RoundToMultipleOptions(FunctionOptions): def _set_options(self, multiple, round_mode): if not isinstance(multiple, Scalar): try: multiple = lib.scalar(multiple) except Exception: _raise_invalid_function_option( multiple, "multiple type for RoundToMultipleOptions", exception_class=TypeError) self.wrapped.reset( new CRoundToMultipleOptions( pyarrow_unwrap_scalar(multiple), unwrap_round_mode(round_mode)) ) class RoundToMultipleOptions(_RoundToMultipleOptions): """ Options for rounding numbers to a multiple. Parameters ---------- multiple : numeric scalar, default 1.0 Multiple to round to. Should be a scalar of a type compatible with the argument to be rounded. round_mode : str, default "half_to_even" Rounding and tie-breaking mode. Accepted values are "down", "up", "towards_zero", "towards_infinity", "half_down", "half_up", "half_towards_zero", "half_towards_infinity", "half_to_even", "half_to_odd". """ def __init__(self, multiple=1.0, round_mode="half_to_even"): self._set_options(multiple, round_mode) cdef class _JoinOptions(FunctionOptions): _null_handling_map = { "emit_null": CJoinNullHandlingBehavior_EMIT_NULL, "skip": CJoinNullHandlingBehavior_SKIP, "replace": CJoinNullHandlingBehavior_REPLACE, } def _set_options(self, null_handling, null_replacement): try: self.wrapped.reset( new CJoinOptions(self._null_handling_map[null_handling], tobytes(null_replacement)) ) except KeyError: _raise_invalid_function_option(null_handling, "null handling") class JoinOptions(_JoinOptions): """ Options for the `binary_join_element_wise` function. Parameters ---------- null_handling : str, default "emit_null" How to handle null values in the inputs. Accepted values are "emit_null", "skip", "replace". null_replacement : str, default "" Replacement string to emit for null inputs if `null_handling` is "replace". """ def __init__(self, null_handling="emit_null", null_replacement=""): self._set_options(null_handling, null_replacement) cdef class _MatchSubstringOptions(FunctionOptions): def _set_options(self, pattern, ignore_case): self.wrapped.reset( new CMatchSubstringOptions(tobytes(pattern), ignore_case) ) class MatchSubstringOptions(_MatchSubstringOptions): """ Options for looking for a substring. Parameters ---------- pattern : str Substring pattern to look for inside input values. ignore_case : bool, default False Whether to perform a case-insensitive match. """ def __init__(self, pattern, *, ignore_case=False): self._set_options(pattern, ignore_case) cdef class _PadOptions(FunctionOptions): def _set_options(self, width, padding): self.wrapped.reset(new CPadOptions(width, tobytes(padding))) class PadOptions(_PadOptions): """ Options for padding strings. Parameters ---------- width : int Desired string length. padding : str, default " " What to pad the string with. Should be one byte or codepoint. """ def __init__(self, width, padding=' '): self._set_options(width, padding) cdef class _TrimOptions(FunctionOptions): def _set_options(self, characters): self.wrapped.reset(new CTrimOptions(tobytes(characters))) class TrimOptions(_TrimOptions): """ Options for trimming characters from strings. Parameters ---------- characters : str Individual characters to be trimmed from the string. """ def __init__(self, characters): self._set_options(tobytes(characters)) cdef class _ReplaceSubstringOptions(FunctionOptions): def _set_options(self, pattern, replacement, max_replacements): self.wrapped.reset( new CReplaceSubstringOptions(tobytes(pattern), tobytes(replacement), max_replacements) ) class ReplaceSubstringOptions(_ReplaceSubstringOptions): """ Options for replacing matched substrings. Parameters ---------- pattern : str Substring pattern to look for inside input values. replacement : str What to replace the pattern with. max_replacements : int or None, default None The maximum number of strings to replace in each input value (unlimited if None). """ def __init__(self, pattern, replacement, *, max_replacements=None): if max_replacements is None: max_replacements = -1 self._set_options(pattern, replacement, max_replacements) cdef class _ExtractRegexOptions(FunctionOptions): def _set_options(self, pattern): self.wrapped.reset(new CExtractRegexOptions(tobytes(pattern))) class ExtractRegexOptions(_ExtractRegexOptions): """ Options for the `extract_regex` function. Parameters ---------- pattern : str Regular expression with named capture fields. """ def __init__(self, pattern): self._set_options(pattern) cdef class _SliceOptions(FunctionOptions): def _set_options(self, start, stop, step): self.wrapped.reset(new CSliceOptions(start, stop, step)) class SliceOptions(_SliceOptions): """ Options for slicing. Parameters ---------- start : int Index to start slicing at (inclusive). stop : int or None, default None If given, index to stop slicing at (exclusive). If not given, slicing will stop at the end. step : int, default 1 Slice step. """ def __init__(self, start, stop=None, step=1): if stop is None: stop = sys.maxsize self._set_options(start, stop, step) cdef class _ListSliceOptions(FunctionOptions): cpdef _set_options(self, start, stop=None, step=1, return_fixed_size_list=None): cdef: CListSliceOptions* opts opts = new CListSliceOptions( start, nullopt if stop is None else (stop), step, nullopt if return_fixed_size_list is None else (return_fixed_size_list) ) self.wrapped.reset(opts) class ListSliceOptions(_ListSliceOptions): """ Options for list array slicing. Parameters ---------- start : int Index to start slicing inner list elements (inclusive). stop : Optional[int], default None If given, index to stop slicing at (exclusive). If not given, slicing will stop at the end. (NotImplemented) step : int, default 1 Slice step. return_fixed_size_list : Optional[bool], default None Whether to return a FixedSizeListArray. If true _and_ stop is after a list element's length, nulls will be appended to create the requested slice size. The default of `None` will return the same type which was passed in. """ def __init__(self, start, stop=None, step=1, return_fixed_size_list=None): self._set_options(start, stop, step, return_fixed_size_list) cdef class _ReplaceSliceOptions(FunctionOptions): def _set_options(self, start, stop, replacement): self.wrapped.reset( new CReplaceSliceOptions(start, stop, tobytes(replacement)) ) class ReplaceSliceOptions(_ReplaceSliceOptions): """ Options for replacing slices. Parameters ---------- start : int Index to start slicing at (inclusive). stop : int Index to stop slicing at (exclusive). replacement : str What to replace the slice with. """ def __init__(self, start, stop, replacement): self._set_options(start, stop, replacement) cdef class _FilterOptions(FunctionOptions): _null_selection_map = { "drop": CFilterNullSelectionBehavior_DROP, "emit_null": CFilterNullSelectionBehavior_EMIT_NULL, } def _set_options(self, null_selection_behavior): try: self.wrapped.reset( new CFilterOptions( self._null_selection_map[null_selection_behavior] ) ) except KeyError: _raise_invalid_function_option(null_selection_behavior, "null selection behavior") class FilterOptions(_FilterOptions): """ Options for selecting with a boolean filter. Parameters ---------- null_selection_behavior : str, default "drop" How to handle nulls in the selection filter. Accepted values are "drop", "emit_null". """ def __init__(self, null_selection_behavior="drop"): self._set_options(null_selection_behavior) cdef class _DictionaryEncodeOptions(FunctionOptions): _null_encoding_map = { "encode": CDictionaryEncodeNullEncodingBehavior_ENCODE, "mask": CDictionaryEncodeNullEncodingBehavior_MASK, } def _set_options(self, null_encoding): try: self.wrapped.reset( new CDictionaryEncodeOptions( self._null_encoding_map[null_encoding] ) ) except KeyError: _raise_invalid_function_option(null_encoding, "null encoding") class DictionaryEncodeOptions(_DictionaryEncodeOptions): """ Options for dictionary encoding. Parameters ---------- null_encoding : str, default "mask" How to encode nulls in the input. Accepted values are "mask" (null inputs emit a null in the indices array), "encode" (null inputs emit a non-null index pointing to a null value in the dictionary array). """ def __init__(self, null_encoding="mask"): self._set_options(null_encoding) cdef class _TakeOptions(FunctionOptions): def _set_options(self, boundscheck): self.wrapped.reset(new CTakeOptions(boundscheck)) class TakeOptions(_TakeOptions): """ Options for the `take` and `array_take` functions. Parameters ---------- boundscheck : boolean, default True Whether to check indices are within bounds. If False and an index is out of boundes, behavior is undefined (the process may crash). """ def __init__(self, *, boundscheck=True): self._set_options(boundscheck) cdef class _MakeStructOptions(FunctionOptions): def _set_options(self, field_names, field_nullability, field_metadata): cdef: vector[c_string] c_field_names vector[shared_ptr[const CKeyValueMetadata]] c_field_metadata for name in field_names: c_field_names.push_back(tobytes(name)) for metadata in field_metadata: c_field_metadata.push_back(pyarrow_unwrap_metadata(metadata)) self.wrapped.reset( new CMakeStructOptions(c_field_names, field_nullability, c_field_metadata) ) class MakeStructOptions(_MakeStructOptions): """ Options for the `make_struct` function. Parameters ---------- field_names : sequence of str Names of the struct fields to create. field_nullability : sequence of bool, optional Nullability information for each struct field. If omitted, all fields are nullable. field_metadata : sequence of KeyValueMetadata, optional Metadata for each struct field. """ def __init__(self, field_names=(), *, field_nullability=None, field_metadata=None): if field_nullability is None: field_nullability = [True] * len(field_names) if field_metadata is None: field_metadata = [None] * len(field_names) self._set_options(field_names, field_nullability, field_metadata) cdef class _StructFieldOptions(FunctionOptions): def _set_options(self, indices): cdef: CFieldRef field_ref const CFieldRef* field_ref_ptr if isinstance(indices, (list, tuple)): if len(indices): indices = Expression._nested_field(tuple(indices)) else: # Allow empty indices; effecitively return same array self.wrapped.reset( new CStructFieldOptions(indices)) return if isinstance(indices, Expression): field_ref_ptr = (indices).unwrap().field_ref() if field_ref_ptr is NULL: raise ValueError("Unable to get CFieldRef from Expression") field_ref = deref(field_ref_ptr) elif isinstance(indices, (bytes, str)): if indices.startswith(b'.' if isinstance(indices, bytes) else '.'): field_ref = GetResultValue( CFieldRef.FromDotPath(tobytes(indices))) else: field_ref = CFieldRef(tobytes(indices)) elif isinstance(indices, int): field_ref = CFieldRef( indices) else: raise TypeError("Expected List[str], List[int], List[bytes], " "Expression, bytes, str, or int. " f"Got: {type(indices)}") self.wrapped.reset(new CStructFieldOptions(field_ref)) class StructFieldOptions(_StructFieldOptions): """ Options for the `struct_field` function. Parameters ---------- indices : List[str], List[bytes], List[int], Expression, bytes, str, or int List of indices for chained field lookup, for example `[4, 1]` will look up the second nested field in the fifth outer field. """ def __init__(self, indices): self._set_options(indices) cdef class _ScalarAggregateOptions(FunctionOptions): def _set_options(self, skip_nulls, min_count): self.wrapped.reset(new CScalarAggregateOptions(skip_nulls, min_count)) class ScalarAggregateOptions(_ScalarAggregateOptions): __doc__ = f""" Options for scalar aggregations. Parameters ---------- {_skip_nulls_doc()} {_min_count_doc(default=1)} """ def __init__(self, *, skip_nulls=True, min_count=1): self._set_options(skip_nulls, min_count) cdef class _CountOptions(FunctionOptions): _mode_map = { "only_valid": CCountMode_ONLY_VALID, "only_null": CCountMode_ONLY_NULL, "all": CCountMode_ALL, } def _set_options(self, mode): try: self.wrapped.reset(new CCountOptions(self._mode_map[mode])) except KeyError: _raise_invalid_function_option(mode, "count mode") class CountOptions(_CountOptions): """ Options for the `count` function. Parameters ---------- mode : str, default "only_valid" Which values to count in the input. Accepted values are "only_valid", "only_null", "all". """ def __init__(self, mode="only_valid"): self._set_options(mode) cdef class _IndexOptions(FunctionOptions): def _set_options(self, scalar): self.wrapped.reset(new CIndexOptions(pyarrow_unwrap_scalar(scalar))) class IndexOptions(_IndexOptions): """ Options for the `index` function. Parameters ---------- value : Scalar The value to search for. """ def __init__(self, value): self._set_options(value) cdef class _MapLookupOptions(FunctionOptions): _occurrence_map = { "all": CMapLookupOccurrence_ALL, "first": CMapLookupOccurrence_FIRST, "last": CMapLookupOccurrence_LAST, } def _set_options(self, query_key, occurrence): try: self.wrapped.reset( new CMapLookupOptions( pyarrow_unwrap_scalar(query_key), self._occurrence_map[occurrence] ) ) except KeyError: _raise_invalid_function_option(occurrence, "Should either be first, last, or all") class MapLookupOptions(_MapLookupOptions): """ Options for the `map_lookup` function. Parameters ---------- query_key : Scalar The key to search for. occurrence : str The occurrence(s) to return from the Map Accepted values are "first", "last", or "all". """ def __init__(self, query_key, occurrence): self._set_options(query_key, occurrence) cdef class _ModeOptions(FunctionOptions): def _set_options(self, n, skip_nulls, min_count): self.wrapped.reset(new CModeOptions(n, skip_nulls, min_count)) class ModeOptions(_ModeOptions): __doc__ = f""" Options for the `mode` function. Parameters ---------- n : int, default 1 Number of distinct most-common values to return. {_skip_nulls_doc()} {_min_count_doc(default=0)} """ def __init__(self, n=1, *, skip_nulls=True, min_count=0): self._set_options(n, skip_nulls, min_count) cdef class _SetLookupOptions(FunctionOptions): def _set_options(self, value_set, c_bool skip_nulls): cdef unique_ptr[CDatum] valset if isinstance(value_set, Array): valset.reset(new CDatum(( value_set).sp_array)) elif isinstance(value_set, ChunkedArray): valset.reset( new CDatum(( value_set).sp_chunked_array) ) elif isinstance(value_set, Scalar): valset.reset(new CDatum(( value_set).unwrap())) else: _raise_invalid_function_option(value_set, "value set", exception_class=TypeError) self.wrapped.reset(new CSetLookupOptions(deref(valset), skip_nulls)) class SetLookupOptions(_SetLookupOptions): """ Options for the `is_in` and `index_in` functions. Parameters ---------- value_set : Array Set of values to look for in the input. skip_nulls : bool, default False If False, nulls in the input are matched in the value_set just like regular values. If True, nulls in the input always fail matching. """ def __init__(self, value_set, *, skip_nulls=False): self._set_options(value_set, skip_nulls) cdef class _StrptimeOptions(FunctionOptions): _unit_map = { "s": TimeUnit_SECOND, "ms": TimeUnit_MILLI, "us": TimeUnit_MICRO, "ns": TimeUnit_NANO, } def _set_options(self, format, unit, error_is_null): try: self.wrapped.reset( new CStrptimeOptions(tobytes(format), self._unit_map[unit], error_is_null) ) except KeyError: _raise_invalid_function_option(unit, "time unit") class StrptimeOptions(_StrptimeOptions): """ Options for the `strptime` function. Parameters ---------- format : str Pattern for parsing input strings as timestamps, such as "%Y/%m/%d". unit : str Timestamp unit of the output. Accepted values are "s", "ms", "us", "ns". error_is_null : boolean, default False Return null on parsing errors if true or raise if false. """ def __init__(self, format, unit, error_is_null=False): self._set_options(format, unit, error_is_null) cdef class _StrftimeOptions(FunctionOptions): def _set_options(self, format, locale): self.wrapped.reset( new CStrftimeOptions(tobytes(format), tobytes(locale)) ) class StrftimeOptions(_StrftimeOptions): """ Options for the `strftime` function. Parameters ---------- format : str, default "%Y-%m-%dT%H:%M:%S" Pattern for formatting input values. locale : str, default "C" Locale to use for locale-specific format specifiers. """ def __init__(self, format="%Y-%m-%dT%H:%M:%S", locale="C"): self._set_options(format, locale) cdef class _DayOfWeekOptions(FunctionOptions): def _set_options(self, count_from_zero, week_start): self.wrapped.reset( new CDayOfWeekOptions(count_from_zero, week_start) ) class DayOfWeekOptions(_DayOfWeekOptions): """ Options for the `day_of_week` function. Parameters ---------- count_from_zero : bool, default True If True, number days from 0, otherwise from 1. week_start : int, default 1 Which day does the week start with (Monday=1, Sunday=7). How this value is numbered is unaffected by `count_from_zero`. """ def __init__(self, *, count_from_zero=True, week_start=1): self._set_options(count_from_zero, week_start) cdef class _WeekOptions(FunctionOptions): def _set_options(self, week_starts_monday, count_from_zero, first_week_is_fully_in_year): self.wrapped.reset( new CWeekOptions(week_starts_monday, count_from_zero, first_week_is_fully_in_year) ) class WeekOptions(_WeekOptions): """ Options for the `week` function. Parameters ---------- week_starts_monday : bool, default True If True, weeks start on Monday; if False, on Sunday. count_from_zero : bool, default False If True, dates at the start of a year that fall into the last week of the previous year emit 0. If False, they emit 52 or 53 (the week number of the last week of the previous year). first_week_is_fully_in_year : bool, default False If True, week number 0 is fully in January. If False, a week that begins on December 29, 30 or 31 is considered to be week number 0 of the following year. """ def __init__(self, *, week_starts_monday=True, count_from_zero=False, first_week_is_fully_in_year=False): self._set_options(week_starts_monday, count_from_zero, first_week_is_fully_in_year) cdef class _AssumeTimezoneOptions(FunctionOptions): _ambiguous_map = { "raise": CAssumeTimezoneAmbiguous_AMBIGUOUS_RAISE, "earliest": CAssumeTimezoneAmbiguous_AMBIGUOUS_EARLIEST, "latest": CAssumeTimezoneAmbiguous_AMBIGUOUS_LATEST, } _nonexistent_map = { "raise": CAssumeTimezoneNonexistent_NONEXISTENT_RAISE, "earliest": CAssumeTimezoneNonexistent_NONEXISTENT_EARLIEST, "latest": CAssumeTimezoneNonexistent_NONEXISTENT_LATEST, } def _set_options(self, timezone, ambiguous, nonexistent): if ambiguous not in self._ambiguous_map: _raise_invalid_function_option(ambiguous, "'ambiguous' timestamp handling") if nonexistent not in self._nonexistent_map: _raise_invalid_function_option(nonexistent, "'nonexistent' timestamp handling") self.wrapped.reset( new CAssumeTimezoneOptions(tobytes(timezone), self._ambiguous_map[ambiguous], self._nonexistent_map[nonexistent]) ) class AssumeTimezoneOptions(_AssumeTimezoneOptions): """ Options for the `assume_timezone` function. Parameters ---------- timezone : str Timezone to assume for the input. ambiguous : str, default "raise" How to handle timestamps that are ambiguous in the assumed timezone. Accepted values are "raise", "earliest", "latest". nonexistent : str, default "raise" How to handle timestamps that don't exist in the assumed timezone. Accepted values are "raise", "earliest", "latest". """ def __init__(self, timezone, *, ambiguous="raise", nonexistent="raise"): self._set_options(timezone, ambiguous, nonexistent) cdef class _NullOptions(FunctionOptions): def _set_options(self, nan_is_null): self.wrapped.reset(new CNullOptions(nan_is_null)) class NullOptions(_NullOptions): """ Options for the `is_null` function. Parameters ---------- nan_is_null : bool, default False Whether floating-point NaN values are considered null. """ def __init__(self, *, nan_is_null=False): self._set_options(nan_is_null) cdef class _VarianceOptions(FunctionOptions): def _set_options(self, ddof, skip_nulls, min_count): self.wrapped.reset(new CVarianceOptions(ddof, skip_nulls, min_count)) class VarianceOptions(_VarianceOptions): __doc__ = f""" Options for the `variance` and `stddev` functions. Parameters ---------- ddof : int, default 0 Number of degrees of freedom. {_skip_nulls_doc()} {_min_count_doc(default=0)} """ def __init__(self, *, ddof=0, skip_nulls=True, min_count=0): self._set_options(ddof, skip_nulls, min_count) cdef class _SplitOptions(FunctionOptions): def _set_options(self, max_splits, reverse): self.wrapped.reset(new CSplitOptions(max_splits, reverse)) class SplitOptions(_SplitOptions): """ Options for splitting on whitespace. Parameters ---------- max_splits : int or None, default None Maximum number of splits for each input value (unlimited if None). reverse : bool, default False Whether to start splitting from the end of each input value. This only has an effect if `max_splits` is not None. """ def __init__(self, *, max_splits=None, reverse=False): if max_splits is None: max_splits = -1 self._set_options(max_splits, reverse) cdef class _SplitPatternOptions(FunctionOptions): def _set_options(self, pattern, max_splits, reverse): self.wrapped.reset( new CSplitPatternOptions(tobytes(pattern), max_splits, reverse) ) class SplitPatternOptions(_SplitPatternOptions): """ Options for splitting on a string pattern. Parameters ---------- pattern : str String pattern to split on. max_splits : int or None, default None Maximum number of splits for each input value (unlimited if None). reverse : bool, default False Whether to start splitting from the end of each input value. This only has an effect if `max_splits` is not None. """ def __init__(self, pattern, *, max_splits=None, reverse=False): if max_splits is None: max_splits = -1 self._set_options(pattern, max_splits, reverse) cdef CSortOrder unwrap_sort_order(order) except *: if order == "ascending": return CSortOrder_Ascending elif order == "descending": return CSortOrder_Descending _raise_invalid_function_option(order, "sort order") cdef CNullPlacement unwrap_null_placement(null_placement) except *: if null_placement == "at_start": return CNullPlacement_AtStart elif null_placement == "at_end": return CNullPlacement_AtEnd _raise_invalid_function_option(null_placement, "null placement") cdef class _PartitionNthOptions(FunctionOptions): def _set_options(self, pivot, null_placement): self.wrapped.reset(new CPartitionNthOptions( pivot, unwrap_null_placement(null_placement))) class PartitionNthOptions(_PartitionNthOptions): """ Options for the `partition_nth_indices` function. Parameters ---------- pivot : int Index into the equivalent sorted array of the pivot element. null_placement : str, default "at_end" Where nulls in the input should be partitioned. Accepted values are "at_start", "at_end". """ def __init__(self, pivot, *, null_placement="at_end"): self._set_options(pivot, null_placement) cdef class _CumulativeSumOptions(FunctionOptions): def _set_options(self, start, skip_nulls): if not isinstance(start, Scalar): try: start = lib.scalar(start) except Exception: _raise_invalid_function_option( start, "`start` type for CumulativeSumOptions", TypeError) self.wrapped.reset(new CCumulativeSumOptions(( start).unwrap(), skip_nulls)) class CumulativeSumOptions(_CumulativeSumOptions): """ Options for `cumulative_sum` function. Parameters ---------- start : Scalar, default 0.0 Starting value for sum computation skip_nulls : bool, default False When false, the first encountered null is propagated. """ def __init__(self, start=0.0, *, skip_nulls=False): self._set_options(start, skip_nulls) cdef class _ArraySortOptions(FunctionOptions): def _set_options(self, order, null_placement): self.wrapped.reset(new CArraySortOptions( unwrap_sort_order(order), unwrap_null_placement(null_placement))) class ArraySortOptions(_ArraySortOptions): """ Options for the `array_sort_indices` function. Parameters ---------- order : str, default "ascending" Which order to sort values in. Accepted values are "ascending", "descending". null_placement : str, default "at_end" Where nulls in the input should be sorted. Accepted values are "at_start", "at_end". """ def __init__(self, order="ascending", *, null_placement="at_end"): self._set_options(order, null_placement) cdef class _SortOptions(FunctionOptions): def _set_options(self, sort_keys, null_placement): cdef vector[CSortKey] c_sort_keys for name, order in sort_keys: c_sort_keys.push_back( CSortKey(tobytes(name), unwrap_sort_order(order)) ) self.wrapped.reset(new CSortOptions( c_sort_keys, unwrap_null_placement(null_placement))) class SortOptions(_SortOptions): """ Options for the `sort_indices` function. Parameters ---------- sort_keys : sequence of (name, order) tuples Names of field/column keys to sort the input on, along with the order each field/column is sorted in. Accepted values for `order` are "ascending", "descending". null_placement : str, default "at_end" Where nulls in input should be sorted, only applying to columns/fields mentioned in `sort_keys`. Accepted values are "at_start", "at_end". """ def __init__(self, sort_keys=(), *, null_placement="at_end"): self._set_options(sort_keys, null_placement) cdef class _SelectKOptions(FunctionOptions): def _set_options(self, k, sort_keys): cdef vector[CSortKey] c_sort_keys for name, order in sort_keys: c_sort_keys.push_back( CSortKey(tobytes(name), unwrap_sort_order(order)) ) self.wrapped.reset(new CSelectKOptions(k, c_sort_keys)) class SelectKOptions(_SelectKOptions): """ Options for top/bottom k-selection. Parameters ---------- k : int Number of leading values to select in sorted order (i.e. the largest values if sort order is "descending", the smallest otherwise). sort_keys : sequence of (name, order) tuples Names of field/column keys to sort the input on, along with the order each field/column is sorted in. Accepted values for `order` are "ascending", "descending". """ def __init__(self, k, sort_keys): self._set_options(k, sort_keys) cdef class _QuantileOptions(FunctionOptions): _interp_map = { "linear": CQuantileInterp_LINEAR, "lower": CQuantileInterp_LOWER, "higher": CQuantileInterp_HIGHER, "nearest": CQuantileInterp_NEAREST, "midpoint": CQuantileInterp_MIDPOINT, } def _set_options(self, quantiles, interp, skip_nulls, min_count): try: self.wrapped.reset( new CQuantileOptions(quantiles, self._interp_map[interp], skip_nulls, min_count) ) except KeyError: _raise_invalid_function_option(interp, "quantile interpolation") class QuantileOptions(_QuantileOptions): __doc__ = f""" Options for the `quantile` function. Parameters ---------- q : double or sequence of double, default 0.5 Quantiles to compute. All values must be in [0, 1]. interpolation : str, default "linear" How to break ties between competing data points for a given quantile. Accepted values are: - "linear": compute an interpolation - "lower": always use the smallest of the two data points - "higher": always use the largest of the two data points - "nearest": select the data point that is closest to the quantile - "midpoint": compute the (unweighted) mean of the two data points {_skip_nulls_doc()} {_min_count_doc(default=0)} """ def __init__(self, q=0.5, *, interpolation="linear", skip_nulls=True, min_count=0): if not isinstance(q, (list, tuple, np.ndarray)): q = [q] self._set_options(q, interpolation, skip_nulls, min_count) cdef class _TDigestOptions(FunctionOptions): def _set_options(self, quantiles, delta, buffer_size, skip_nulls, min_count): self.wrapped.reset( new CTDigestOptions(quantiles, delta, buffer_size, skip_nulls, min_count) ) class TDigestOptions(_TDigestOptions): __doc__ = f""" Options for the `tdigest` function. Parameters ---------- q : double or sequence of double, default 0.5 Quantiles to approximate. All values must be in [0, 1]. delta : int, default 100 Compression parameter for the T-digest algorithm. buffer_size : int, default 500 Buffer size for the T-digest algorithm. {_skip_nulls_doc()} {_min_count_doc(default=0)} """ def __init__(self, q=0.5, *, delta=100, buffer_size=500, skip_nulls=True, min_count=0): if not isinstance(q, (list, tuple, np.ndarray)): q = [q] self._set_options(q, delta, buffer_size, skip_nulls, min_count) cdef class _Utf8NormalizeOptions(FunctionOptions): _form_map = { "NFC": CUtf8NormalizeForm_NFC, "NFKC": CUtf8NormalizeForm_NFKC, "NFD": CUtf8NormalizeForm_NFD, "NFKD": CUtf8NormalizeForm_NFKD, } def _set_options(self, form): try: self.wrapped.reset( new CUtf8NormalizeOptions(self._form_map[form]) ) except KeyError: _raise_invalid_function_option(form, "Unicode normalization form") class Utf8NormalizeOptions(_Utf8NormalizeOptions): """ Options for the `utf8_normalize` function. Parameters ---------- form : str Unicode normalization form. Accepted values are "NFC", "NFKC", "NFD", NFKD". """ def __init__(self, form): self._set_options(form) cdef class _RandomOptions(FunctionOptions): def _set_options(self, initializer): if initializer == 'system': self.wrapped.reset(new CRandomOptions( CRandomOptions.FromSystemRandom())) return if not isinstance(initializer, int): try: initializer = hash(initializer) except TypeError: raise TypeError( f"initializer should be 'system', an integer, " f"or a hashable object; got {initializer!r}") if initializer < 0: initializer += 2**64 self.wrapped.reset(new CRandomOptions( CRandomOptions.FromSeed(initializer))) class RandomOptions(_RandomOptions): """ Options for random generation. Parameters ---------- initializer : int or str How to initialize the underlying random generator. If an integer is given, it is used as a seed. If "system" is given, the random generator is initialized with a system-specific source of (hopefully true) randomness. Other values are invalid. """ def __init__(self, *, initializer='system'): self._set_options(initializer) cdef class _RankOptions(FunctionOptions): _tiebreaker_map = { "min": CRankOptionsTiebreaker_Min, "max": CRankOptionsTiebreaker_Max, "first": CRankOptionsTiebreaker_First, "dense": CRankOptionsTiebreaker_Dense, } def _set_options(self, sort_keys, null_placement, tiebreaker): cdef vector[CSortKey] c_sort_keys if isinstance(sort_keys, str): c_sort_keys.push_back( CSortKey(tobytes(""), unwrap_sort_order(sort_keys)) ) else: for name, order in sort_keys: c_sort_keys.push_back( CSortKey(tobytes(name), unwrap_sort_order(order)) ) try: self.wrapped.reset( new CRankOptions(c_sort_keys, unwrap_null_placement(null_placement), self._tiebreaker_map[tiebreaker]) ) except KeyError: _raise_invalid_function_option(tiebreaker, "tiebreaker") class RankOptions(_RankOptions): """ Options for the `rank` function. Parameters ---------- sort_keys : sequence of (name, order) tuples or str, default "ascending" Names of field/column keys to sort the input on, along with the order each field/column is sorted in. Accepted values for `order` are "ascending", "descending". Alternatively, one can simply pass "ascending" or "descending" as a string if the input is array-like. null_placement : str, default "at_end" Where nulls in input should be sorted. Accepted values are "at_start", "at_end". tiebreaker : str, default "first" Configure how ties between equal values are handled. Accepted values are: - "min": Ties get the smallest possible rank in sorted order. - "max": Ties get the largest possible rank in sorted order. - "first": Ranks are assigned in order of when ties appear in the input. This ensures the ranks are a stable permutation of the input. - "dense": The ranks span a dense [1, M] interval where M is the number of distinct values in the input. """ def __init__(self, sort_keys="ascending", *, null_placement="at_end", tiebreaker="first"): self._set_options(sort_keys, null_placement, tiebreaker) def _group_by(args, keys, aggregations): cdef: vector[CDatum] c_args vector[CDatum] c_keys vector[CAggregate] c_aggregations CDatum result CAggregate c_aggr _pack_compute_args(args, &c_args) _pack_compute_args(keys, &c_keys) for aggr_func_name, aggr_opts in aggregations: c_aggr.function = tobytes(aggr_func_name) if aggr_opts is not None: c_aggr.options = (aggr_opts).wrapped else: c_aggr.options = nullptr c_aggregations.push_back(c_aggr) with nogil: result = GetResultValue( GroupBy(c_args, c_keys, c_aggregations) ) return wrap_datum(result) cdef class Expression(_Weakrefable): """ A logical expression to be evaluated against some input. To create an expression: - Use the factory function ``pyarrow.compute.scalar()`` to create a scalar (not necessary when combined, see example below). - Use the factory function ``pyarrow.compute.field()`` to reference a field (column in table). - Compare fields and scalars with ``<``, ``<=``, ``==``, ``>=``, ``>``. - Combine expressions using python operators ``&`` (logical and), ``|`` (logical or) and ``~`` (logical not). Note: python keywords ``and``, ``or`` and ``not`` cannot be used to combine expressions. - Create expression predicates using Expression methods such as ``pyarrow.compute.Expression.isin()``. Examples -------- >>> import pyarrow.compute as pc >>> (pc.field("a") < pc.scalar(3)) | (pc.field("b") > 7) 7))> >>> pc.field('a') != 3 >>> pc.field('a').isin([1, 2, 3]) """ def __init__(self): msg = 'Expression is an abstract class thus cannot be initialized.' raise TypeError(msg) cdef void init(self, const CExpression& sp): self.expr = sp @staticmethod cdef wrap(const CExpression& sp): cdef Expression self = Expression.__new__(Expression) self.init(sp) return self cdef inline CExpression unwrap(self): return self.expr def equals(self, Expression other): return self.expr.Equals(other.unwrap()) def __str__(self): return frombytes(self.expr.ToString()) def __repr__(self): return "".format( self.__class__.__name__, str(self) ) @staticmethod def _deserialize(Buffer buffer not None): return Expression.wrap(GetResultValue(CDeserializeExpression( pyarrow_unwrap_buffer(buffer)))) def __reduce__(self): buffer = pyarrow_wrap_buffer(GetResultValue( CSerializeExpression(self.expr))) return Expression._deserialize, (buffer,) @staticmethod cdef Expression _expr_or_scalar(object expr): if isinstance(expr, Expression): return ( expr) return ( Expression._scalar(expr)) @staticmethod def _call(str function_name, list arguments, FunctionOptions options=None): cdef: vector[CExpression] c_arguments shared_ptr[CFunctionOptions] c_options for argument in arguments: if not isinstance(argument, Expression): # Attempt to help convert this to an expression try: argument = Expression._scalar(argument) except ArrowInvalid: raise TypeError( "only other expressions allowed as arguments") c_arguments.push_back(( argument).expr) if options is not None: c_options = options.unwrap() return Expression.wrap(CMakeCallExpression( tobytes(function_name), move(c_arguments), c_options)) def __richcmp__(self, other, int op): other = Expression._expr_or_scalar(other) return Expression._call({ Py_EQ: "equal", Py_NE: "not_equal", Py_GT: "greater", Py_GE: "greater_equal", Py_LT: "less", Py_LE: "less_equal", }[op], [self, other]) def __bool__(self): raise ValueError( "An Expression cannot be evaluated to python True or False. " "If you are using the 'and', 'or' or 'not' operators, use '&', " "'|' or '~' instead." ) def __invert__(self): return Expression._call("invert", [self]) def __and__(Expression self, other): other = Expression._expr_or_scalar(other) return Expression._call("and_kleene", [self, other]) def __or__(Expression self, other): other = Expression._expr_or_scalar(other) return Expression._call("or_kleene", [self, other]) def __add__(Expression self, other): other = Expression._expr_or_scalar(other) return Expression._call("add_checked", [self, other]) def __mul__(Expression self, other): other = Expression._expr_or_scalar(other) return Expression._call("multiply_checked", [self, other]) def __sub__(Expression self, other): other = Expression._expr_or_scalar(other) return Expression._call("subtract_checked", [self, other]) def __truediv__(Expression self, other): other = Expression._expr_or_scalar(other) return Expression._call("divide_checked", [self, other]) def is_valid(self): """ Check whether the expression is not-null (valid). This creates a new expression equivalent to calling the `is_valid` compute function on this expression. Returns ------- is_valid : Expression """ return Expression._call("is_valid", [self]) def is_null(self, bint nan_is_null=False): """ Check whether the expression is null. This creates a new expression equivalent to calling the `is_null` compute function on this expression. Parameters ---------- nan_is_null : boolean, default False Whether floating-point NaNs are considered null. Returns ------- is_null : Expression """ options = NullOptions(nan_is_null=nan_is_null) return Expression._call("is_null", [self], options) def cast(self, type=None, safe=None, options=None): """ Explicitly set or change the expression's data type. This creates a new expression equivalent to calling the `cast` compute function on this expression. Parameters ---------- type : DataType, default None Type to cast array to. safe : boolean, default True Whether to check for conversion errors such as overflow. options : CastOptions, default None Additional checks pass by CastOptions Returns ------- cast : Expression """ safe_vars_passed = (safe is not None) or (type is not None) if safe_vars_passed and (options is not None): raise ValueError("Must either pass values for 'type' and 'safe' or pass a " "value for 'options'") if options is None: type = ensure_type(type, allow_none=False) if safe is False: options = CastOptions.unsafe(type) else: options = CastOptions.safe(type) return Expression._call("cast", [self], options) def isin(self, values): """ Check whether the expression is contained in values. This creates a new expression equivalent to calling the `is_in` compute function on this expression. Parameters ---------- values : Array or iterable The values to check for. Returns ------- isin : Expression A new expression that, when evaluated, checks whether this expression's value is contained in `values`. """ if not isinstance(values, Array): values = lib.array(values) options = SetLookupOptions(values) return Expression._call("is_in", [self], options) @staticmethod def _field(name_or_idx not None): cdef: CFieldRef c_field if isinstance(name_or_idx, int): return Expression.wrap(CMakeFieldExpressionByIndex(name_or_idx)) else: c_field = CFieldRef( tobytes(name_or_idx)) return Expression.wrap(CMakeFieldExpression(c_field)) @staticmethod def _nested_field(tuple names not None): cdef: vector[CFieldRef] nested if len(names) == 0: raise ValueError("nested field reference should be non-empty") nested.reserve(len(names)) for name in names: if isinstance(name, int): nested.push_back(CFieldRef(name)) else: nested.push_back(CFieldRef( tobytes(name))) return Expression.wrap(CMakeFieldExpression(CFieldRef(move(nested)))) @staticmethod def _scalar(value): cdef: Scalar scalar if isinstance(value, Scalar): scalar = value else: scalar = lib.scalar(value) return Expression.wrap(CMakeScalarExpression(scalar.unwrap())) _deserialize = Expression._deserialize cdef CExpression _true = CMakeScalarExpression( make_shared[CBooleanScalar](True) ) cdef CExpression _bind(Expression filter, Schema schema) except *: assert schema is not None if filter is None: return _true return GetResultValue(filter.unwrap().Bind( deref(pyarrow_unwrap_schema(schema).get()))) cdef class ScalarUdfContext: """ Per-invocation function context/state. This object will always be the first argument to a user-defined function. It should not be used outside of a call to the function. """ def __init__(self): raise TypeError("Do not call {}'s constructor directly" .format(self.__class__.__name__)) cdef void init(self, const CScalarUdfContext &c_context): self.c_context = c_context @property def batch_length(self): """ The common length of all input arguments (int). In the case that all arguments are scalars, this value is used to pass the "actual length" of the arguments, e.g. because the scalar values are encoding a column with a constant value. """ return self.c_context.batch_length @property def memory_pool(self): """ A memory pool for allocations (:class:`MemoryPool`). This is the memory pool supplied by the user when they invoked the function and it should be used in any calls to arrow that the UDF makes if that call accepts a memory_pool. """ return box_memory_pool(self.c_context.pool) cdef inline CFunctionDoc _make_function_doc(dict func_doc) except *: """ Helper function to generate the FunctionDoc This function accepts a dictionary and expects the summary(str), description(str) and arg_names(List[str]) keys. """ cdef: CFunctionDoc f_doc vector[c_string] c_arg_names f_doc.summary = tobytes(func_doc["summary"]) f_doc.description = tobytes(func_doc["description"]) for arg_name in func_doc["arg_names"]: c_arg_names.push_back(tobytes(arg_name)) f_doc.arg_names = c_arg_names # UDFOptions integration: # TODO: https://issues.apache.org/jira/browse/ARROW-16041 f_doc.options_class = b"" f_doc.options_required = False return f_doc cdef object box_scalar_udf_context(const CScalarUdfContext& c_context): cdef ScalarUdfContext context = ScalarUdfContext.__new__(ScalarUdfContext) context.init(c_context) return context cdef _scalar_udf_callback(user_function, const CScalarUdfContext& c_context, inputs): """ Helper callback function used to wrap the ScalarUdfContext from Python to C++ execution. """ context = box_scalar_udf_context(c_context) return user_function(context, *inputs) def _get_scalar_udf_context(memory_pool, batch_length): cdef CScalarUdfContext c_context c_context.pool = maybe_unbox_memory_pool(memory_pool) c_context.batch_length = batch_length context = box_scalar_udf_context(c_context) return context def register_scalar_function(func, function_name, function_doc, in_types, out_type): """ Register a user-defined scalar function. A scalar function is a function that executes elementwise operations on arrays or scalars, i.e. a scalar function must be computed row-by-row with no state where each output row is computed only from its corresponding input row. In other words, all argument arrays have the same length, and the output array is of the same length as the arguments. Scalar functions are the only functions allowed in query engine expressions. Parameters ---------- func : callable A callable implementing the user-defined function. The first argument is the context argument of type ScalarUdfContext. Then, it must take arguments equal to the number of in_types defined. It must return an Array or Scalar matching the out_type. It must return a Scalar if all arguments are scalar, else it must return an Array. To define a varargs function, pass a callable that takes varargs. The last in_type will be the type of all varargs arguments. function_name : str Name of the function. This name must be globally unique. function_doc : dict A dictionary object with keys "summary" (str), and "description" (str). in_types : Dict[str, DataType] A dictionary mapping function argument names to their respective DataType. The argument names will be used to generate documentation for the function. The number of arguments specified here determines the function arity. out_type : DataType Output type of the function. Examples -------- >>> import pyarrow as pa >>> import pyarrow.compute as pc >>> >>> func_doc = {} >>> func_doc["summary"] = "simple udf" >>> func_doc["description"] = "add a constant to a scalar" >>> >>> def add_constant(ctx, array): ... return pc.add(array, 1, memory_pool=ctx.memory_pool) >>> >>> func_name = "py_add_func" >>> in_types = {"array": pa.int64()} >>> out_type = pa.int64() >>> pc.register_scalar_function(add_constant, func_name, func_doc, ... in_types, out_type) >>> >>> func = pc.get_function(func_name) >>> func.name 'py_add_func' >>> answer = pc.call_function(func_name, [pa.array([20])]) >>> answer [ 21 ] """ cdef: c_string c_func_name CArity c_arity CFunctionDoc c_func_doc vector[shared_ptr[CDataType]] c_in_types PyObject* c_function shared_ptr[CDataType] c_out_type CScalarUdfOptions c_options if callable(func): c_function = func else: raise TypeError("func must be a callable") c_func_name = tobytes(function_name) func_spec = inspect.getfullargspec(func) num_args = -1 if isinstance(in_types, dict): for in_type in in_types.values(): c_in_types.push_back( pyarrow_unwrap_data_type(ensure_type(in_type))) function_doc["arg_names"] = in_types.keys() num_args = len(in_types) else: raise TypeError( "in_types must be a dictionary of DataType") c_arity = CArity( num_args, func_spec.varargs) if "summary" not in function_doc: raise ValueError("Function doc must contain a summary") if "description" not in function_doc: raise ValueError("Function doc must contain a description") if "arg_names" not in function_doc: raise ValueError("Function doc must contain arg_names") c_func_doc = _make_function_doc(function_doc) c_out_type = pyarrow_unwrap_data_type(ensure_type(out_type)) c_options.func_name = c_func_name c_options.arity = c_arity c_options.func_doc = c_func_doc c_options.input_types = c_in_types c_options.output_type = c_out_type check_status(RegisterScalarFunction(c_function, &_scalar_udf_callback, c_options))