2010年9月20日星期一

Mysql UDF (转贴)

最近公司在做个SNS子项目,需要把交易系统中的交易数据实时地发送到SNS子系统中。最自然的设计是修改各个交易模块,将数据向SNS系统传输。但是在评审和开发的时候遇到了不小的阻力。理由很简单,这些修改直接影响系统的核心交易模块,引入了一定的风险。综合考虑后,决定增加个类似于Memcached Functions for MySQL的备选方案。SNS子系统虽然大量地使用了cache,但是没有使用Memcached或EHCache,原因是这些cache实现无法满足所有的需求。最初的想法是在UDF中使用ActiveMQ CPP,将消息直接发送到SNS子系统的cache中。不过最终还是放弃了这种比较激进的做法,而是在UDF中将数据通过socket发送到某个Java程序,然后再对数据进行分类处理后,转发给SNS cache。



关于user-defined function(UDF),在MySQL的官方文档上有比较详细的说明。为了使用UDF,必须要动态链接mysqld,也就是不能使用--with-mysqld-ldflags=-all-static,而是应该使用--with-mysqld-ldflags=-rdynamic。UDF通常需要用C/C++编写,如果要编写一个名为xxx的UDF,那么需要定义如下的C/C++方法:



  • xxx() (required)。这个方法的返回值就是UDF的结果。SQL数据类型和C/C++类型之间的对应关系是:varchar char *;INTEGER long long;REAL double;DECIMAL char *等。

  • xxx_init() (optional)。xxx()对应的初始化方法。该方法通常用来检查xxx()方法的参数(或者进行参数类型的转换),分配内存,指定返回值的最大长度等。

  • xxx_deinit() (optional)。xxx()对应的清理方法。在这个方法中应该释放之前分配的内存。



当在SQL中调用名为XXX()的UDF时,MySQL会首先尝试调用名为xxx_init() 的方法,如果该方法返回false,那么MySQL会终止SQL的执行,并返回一个error message(在xxx_init()中保存在message参数中的以null结尾的字符串,最大长度为 MYSQL_ERRMSG_SIZE)。否则MySQL会对每个row调用xxx() 方法。在所有的row都被处理后,MySQL会调用xxx_deinit() 方法来执行相应的清理工作。对于那些象SUM()之类的聚集函数,还有一些其他的C/C++函数需要编写。需要注意的是,如果采用C++编写UDF,由于C++的"name mangling"会导致MySQL无法找到对应的C++函数,因此需要将函数声明包含在extern "C" { ... }中。以下是这些函数的例子:


my_bool xxx_init(UDF_INIT *initid, UDF_ARGS *args, char *message);

char *xxx(UDF_INIT *initid, UDF_ARGS *args, char *result, unsigned long *length, char *is_null, char *error);

void xxx_deinit(UDF_INIT *initid);


initid参数会被传递到所有三个函数中(用于在这三个函数中共享数据),它指向一个UDF_INIT结构体。关于该结构体的成员,可以参考MySQL的在线文档。args参数指向一个UDF_ARGS结构体,它有如下成员:



  • unsigned int arg_count。参数的个数。

  • enum Item_result *arg_type。参数的类型。可选值有STRING_RESULT, INT_RESULT, REAL_RESULT, and DECIMAL_RESULT。DECIMAL_RESULT类型的值是以char *的形式传入到函数中的,因此处理方式同STRING_RESULT类型参数。如果xxx_init()中对该成员赋值,例如args->arg_type[0] = STRING_RESULT;那么MySQl会在调用xxx()是对 相应的参数进行强制地类型转换。

  • char **args。参数值。如果参数值是NULL,那么args->args[i]是一个空指针(0)。对于STRING_RESUL类型的参数,那么可以通过 args->args[i]访问该字符串,字符串的长度是args->lengths[i](不要假定该字符串是以null结尾的)。对于INT_RESULT类型的参数,通过long long int_val = *((long long*) args->args[i]);进行访问。对于REAL_RESULT类型参数,通过double real_val = *((double*) args->args[i]);进行访问。对于DECIMAL_RESULT类型参数,处理方式同STRING_RESULT。


  • unsigned long *lengths。在xxx_init()函数中,其值是字符串参数的最大长度。在xxx()函数中,其值是字符串参数的实际长度。对于INT_RESULT 或者REAL_RESULT类型参数,其值始终是最大可能值。

  • char *maybe_null。在xxx_init()函数中,其值表明该参数是否可以为null。


  • char **attributes。参数名。默认情况下是SQL中的表达式文本(同样,不要假定该字符串是以null结尾的)。

  • unsigned long *attribute_lengths。参数名的字符串长度。


以下是个UDF C代码的片段:


#include <stdio.h>
#include <stdlib.h>

#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <pthread.h>
#include <arpa/inet.h>
#include <sys/socket.h>

#include "mysql.h"

///////////////////////////////////////////////////
extern "C" {
//
my_bool send_message_open_init(UDF_INIT *initid, UDF_ARGS *args, char *message);
char * send_message_open(UDF_INIT *initid, UDF_ARGS *args, char* result, unsigned long *length, char *is_null, char *error);
void send_message_open_deinit(UDF_INIT *initid);

//
my_bool send_message_close_init(UDF_INIT *initid, UDF_ARGS *args, char *message);
char * send_message_close(UDF_INIT *initid, UDF_ARGS *args, char* result, unsigned long *length, char *is_null, char *error);
void send_message_close_deinit(UDF_INIT *initid);

//
my_bool send_message_init(UDF_INIT *initid, UDF_ARGS *args, char *message);
char * send_message(UDF_INIT *initid, UDF_ARGS *args, char* result, unsigned long *length, char *is_null, char *error);
void send_message_deinit(UDF_INIT *initid);
}

////////////////////////////////////////////////////
// Open
////////////////////////////////////////////////////
my_bool send_message_open_init(UDF_INIT *initid, UDF_ARGS *args, char *message) {
// Validation
if(args->arg_count != 2) {
strcpy(message,"usage: select send_message_open('10.15.3.68', 7777)");
return -1;
} else if(args->arg_type[0] != STRING_RESULT || args->arg_type[1] != INT_RESULT){
strcpy(message,"usage: select send_message_open('10.15.3.68', 7777)");
return -1;
} else {
return 0;
}
}

char * send_message_open(UDF_INIT *initid, UDF_ARGS *args, char* result, unsigned long *length, char *is_null, char *error){
...
}

void send_message_open_deinit(UDF_INIT *initid) {
}

...


编译:


g++ -I/usr/local/mysql/include/mysql/ -shared -o send_message.so send_message.c


编译后需要将so文件拷贝到MySQL可以加载的位置,如:

cp send_message.so /usr/local/mysql6320/lib/mysql/plugin/




在MySQL中创建UDF:


DROP FUNCTION IF EXISTS send_message_open;
DROP FUNCTION IF EXISTS send_message_close;
DROP FUNCTION IF EXISTS send_message;
CREATE FUNCTION send_message_open RETURNS STRING SONAME 'send_message.so';
CREATE FUNCTION send_message_close RETURNS STRING SONAME 'send_message.so';
CREATE FUNCTION send_message RETURNS STRING SONAME 'send_message.so';


测试:


select send_message_open('10.15.3.68', 7777);
select send_message('test message');
select send_message_close();

没有评论: