How to implement user defined functions in Acero

In my opinion, Apache Arrow is the de-facto standard for in-memory processing data on modern hardware like CPUs and GPUs. It not only provides a modern specification for columnar data layout, but has all the features an up-to-date framework for data processing actually should provide: data transfer with GRPC, an extensible library of processing nodes, a well sound idea for stream processing and support for modern serialization formats like ORC and parquet.

Due to its clear specification and its core written in standardized C++, it can be easily integrated into libraries like Apache Spark to speed up the transfer of data between different frameworks. In difference to many other approaches, like pandas in Python or the tidyverse in R, it is language-agnostic and can serve as the perfect base for your own applications or tailor-made data pipelines.

Implementing a user defined function

Although being a great package, documentation on some parts could be improved. Recently, I had the challenge to implement a user defined function for Acero - the stream processing part of the framework. As it was quite difficult to find up-to-date documentation how to process UTF-8 data, I'm going to share my approach here.

In order to implement this in a straight-forward way, we need to use some classes that are available only internally to the framework. Therefore, we extract the class StringTransformBase that is already part of Apache Acero, but not exported by default.

struct StringTransformBase {
    virtual ~StringTransformBase() = default;
    virtual arrow::Status PreExec(cp::KernelContext* ctx,
                                  const cp::ExecSpan& batch,
                                  cp::ExecResult* out) {
        return arrow::Status::OK();
    }
    
    // Return the maximum total size of the output in codeunits (i.e. bytes)
    // given input characteristics.
    virtual int64_t MaxCodeunits(int64_t ninputs, int64_t input_ncodeunits) {
        return input_ncodeunits;
    }
    
    virtual arrow::Status InvalidInputSequence() {
        return arrow::Status::Invalid("Invalid UTF8 sequence in input");
    }
    
    int64_t Transform(const uint8_t* input, int64_t input_string_ncodeunits,
                      uint8_t* output) {
        return 0;
    }
};

After that, we can subclass this base class for our own transformation - in this case parsing an UTF8 string with boost::url and extracting the part with the host from it:

struct URLParseTransform : StringTransformBase {
    int64_t Transform(const uint8_t* input, int64_t input_string_ncodeunits,
                      uint8_t* output) {
                    
        std::string_view s{(char*)input, (unsigned long)input_string_ncodeunits};
        
        boost::system::result<boost::url_view> r = boost::urls::parse_uri( s );
        if (r.has_value()) {
            boost::url_view u = r.value();
            
            std::string host = u.host();
            
            memcpy(output, host.data(), host.size());
            return host.size();
        } else {
            return 0;
        }
    }
};

In case you want the function to have options as well, these could be defined by subclassing cp::FunctionOptions, as shown by this snippet here:

class URLParseOptions : public cp::FunctionOptions {

public:
    URLParseOptionsExtract extract = HOST;

    URLParseOptions(URLParseOptionsExtract _extract = HOST) :
        cp::FunctionOptions(GetURLParseOptionsType()) {
        extract = _extract;
    }
};

Registering the function

The only thing left to do afterwards is to register the transform via an executor and a kernel as function together with the options in the global function registry, so that Acero can pick it up when you are going to define custom execution plans:

void RegisterCustomFunctions() {
    auto func = std::make_shared<cp::ScalarFunction>("url_extract",
                                                     cp::Arity::Unary(),
                                                     func_doc);

    cp::ScalarKernel kernel({arrow::utf8()},
                            arrow::utf8(),
                            StringTransformExec<...>::Execute,
                            StringTransformExec<...>::State::Init);

    kernel.mem_allocation = cp::MemAllocation::PREALLOCATE;
    kernel.null_handling = cp::NullHandling::INTERSECTION;

    ARROW_RETURN_NOT_OK(func->AddKernel(std::move(kernel)));

    auto registry = cp::GetFunctionRegistry();
    ARROW_RETURN_NOT_OK(registry->AddFunction(std::move(func)));
    ARROW_RETURN_NOT_OK(registry->AddFunctionOptionsType(GetURLParseOptionsType()));
}

The StringTransformExec is available as internal class in the Acero framework and can be just copied 1:1 as source into your project. So, nothing too fancy - it just takes some time to set this up in the right way. For the complete source code, you can have a look at this repository on Github.