重庆分公司,新征程启航
为企业提供网站建设、域名注册、服务器等服务
为了进行复杂信息的存储和查询,服务端系统往往需要数据库操作。数据库分为关系型数据库和非关系型数据库,关系型数据库有MySQL、Oracle、SQL Server等,非关系型数据库有redis(常用来做缓存)、MongoDB等。MySQL是目前很流行的数据库,本文将要介绍如何在node服务中进行MySQL数据库操作。
成都创新互联公司专业提供成都主机托管四川主机托管成都服务器托管四川服务器托管,支持按月付款!我们的承诺:贵族品质、平民价格,机房位于中国电信/网通/移动机房,多线服务器托管服务有保障!
npm install mysql --save
或者
yarn add mysql
要想进行数据库操作就需要和数据库建立连接,然后通过连接进行数据库的操作。MySQL的数据库连接方式有以下几种:
mysqljs文档中推荐使用第一种方式:每次请求建立一个连接,但是由于频繁的建立、关闭数据库连接,会极大的降低系统的性能,所以我选择了使用连接池的方式,如果对性能有更高的要求,安装了MySQL 集群,可以选择使用连接池集群。
将数据库相关的配置添加到公用的配置文件中,方便项目的初始化。
module.exports = {
…
// mysql数据库配置
mysql: {
// 主机
host: 'localhost',
// 端口
port: 3306,
// 用户名
user: 'root',
// 密码
password: '123456',
// 数据库名
database: 'server-demo',
// 连接池允许创建的最大连接数,默认值为10
connectionLimit: 50,
// 允许挂起的最大连接数,默认值为0,代表挂起的连接数无限制
queueLimit: 0
}
};
connectionLimit 和 queueLimit 是数据连接池特有的配置项。
/**
* 数据库连接池
*/
const mysql = require('mysql');
const config = require('../config');
// 创建数据库连接池
const pool = mysql.createPool(config.mysql);
pool.on('acquire', function (connection) {
console.log(`获取数据库连接 [${connection.threadId}]`);
});
pool.on('connection', function (connection) {
console.log(`创建数据库连接 [${connection.threadId}]`);
});
pool.on('enqueue', function () {
console.log('正在等待可用数据库连接');
});
pool.on('release', function (connection) {
console.log(`数据库连接 [${connection.threadId}] 已释放`);
});
module.exports = pool;
创建数据库连接池pool后,就可以通过pool获取数据库连接了,另外通过监听连接池的事件可以了解连接池中连接的使用情况。
如果将connectionLimit 设为2,queueLimit 设为0,当同时有5个请求获取数据库连接时,线程池的事件日志如下:
正在等待可用数据库连接
正在等待可用数据库连接
正在等待可用数据库连接
创建数据库连接 [1011]
获取数据库连接 [1011]
数据库连接 [1011] 已释放
获取数据库连接 [1011]
创建数据库连接 [1012]
获取数据库连接 [1012]
数据库连接 [1011] 已释放
获取数据库连接 [1011]
数据库连接 [1012] 已释放
获取数据库连接 [1012]
数据库连接 [1011] 已释放
数据库连接 [1012] 已释放
由于线程池允许的最大连接数是2,5个请求中会有2个请求能够得到连接,另外3个请求挂起等待可用连接。由于创建数据库连接的代价比较大,线程池在创建连接时采用懒汉式,也就是,用到时才创建。先得到连接的请求在完成操作后释放连接,放回到连接池,然后挂起的请求从线程池取出空闲的连接进行操作。
由于mysql 模块的接口都为回调方式的,为了操作方便简单地将接口封装为Promise,相关方法封装如下:
const pool = require('./pool');
// 获取连接
function getConnection () {
return new Promise((resolve, reject) => {
pool.getConnection((err, connection) => {
if (err) {
console.error('获取数据库连接失败!', err)
reject(err);
} else {
resolve(connection);
}
});
});
}
// 开始数据库事务
function beginTransaction (connection) {
return new Promise((resolve, reject) => {
connection.beginTransaction(err => {
if (err) {
reject(err);
} else {
resolve();
}
});
});
}
// 提交数据库操作
function commit (connection) {
return new Promise((resolve, reject) => {
connection.commit(err => {
if (err) {
reject(err);
} else {
resolve();
}
});
})
}
// 回滚数据库操作
function rollback (connection) {
return new Promise((resolve, reject) => {
connection.rollback(err => {
if (err) {
reject(err);
} else {
resolve();
}
});
})
}
对于不需要使用事务的普通操作,获取数据库连接connection后,使用connection进行数据库操作,完成后释放连接到连接池,则执行完成一次操作。
/**
* 执行数据库操作【适用于不需要事务的查询以及单条的增、删、改操作】
* 示例:
* let func = async function(conn, projectId, memberId) { ... };
* await execute( func, projectId, memberId);
* @param func 具体的数据库操作异步方法(第一个参数必须为数据库连接对象connection)
* @param params func方法的参数(不包含第一个参数 connection)
* @returns {Promise.<*>} func方法执行后的返回值
*/
async function execute (func, ...params) {
let connection = null;
try {
connection = await getConnection()
let result = await func(connection, ...params);
return result
} finally {
connection && connection.release && connection.release();
}
}
对于很多业务都需要执行事务操作,例如:银行转账,A账户转账给B账户 100元,这个业务操作需要执行两步,从A账户减去100元,然后给B账户增加100元。两个子操作必须全部执行成功才能完成完整的业务操作,如果任意子操作执行失败就需要撤销之前的操作,进行回滚。
对于需要使用事务的操作,获取数据库连接connection后,首先需要调用connection.beginTransaction() 开始事务,然后使用connection进行多步操作,完成后执行connection.commit() 进行提交,则执行完成一次事务操作。如果在执行过程中出现了异常,则执行connection.rollback() 进行回滚操作。
/**
* 执行数据库事务操作【适用于增、删、改多个操作的执行,如果中间数据操作出现异常则之前的数据库操作全部回滚】
* 示例:
* let func = async function(conn) { ... };
* await executeTransaction(func);
* @param func 具体的数据库操作异步方法(第一个参数必须为数据库连接对象connection)
* @returns {Promise.<*>} func方法执行后的返回值
*/
async function executeTransaction(func) {
const connection = await getConnection();
await beginTransaction(connection);
let result = null;
try {
result = await func(connection);
await commit(connection);
return result
} catch (err) {
console.error('事务执行失败,操作回滚');
await rollback(connection);
throw err;
} finally {
connection && connection.release && connection.release();
}
}
增删改查是处理数据的基本原子操作,将这些操作根据操作的特点进行简单的封装。
/**
* 查询操作
* @param connection 连接
* @param sql SQL语句
* @param val SQL参数
* @returns {Promise} resolve查询到的数据数组
*/
function query (connection, sql, val) {
// console.info('sql执行query操作:\n', sql, '\n', val);
return new Promise((resolve, reject) => {
connection.query(sql, val, (err, rows) => {
if (err) {
console.error('sql执行失败!', sql, '\n', val);
reject(err);
} else {
let results = JSON.parse(JSON.stringify(rows));
resolve(results);
}
});
});
}
/**
* 查询单条数据操作
* @param connection 连接
* @param sql SQL语句
* @param val SQL参数
* @returns {Promise} resolve查询到的数据对象
*/
function queryOne (connection, sql, val) {
return new Promise((resolve, reject) => {
query(connection, sql, val).then(
results => {
let result = results.length > 0 ? results[0] : null;
resolve(result);
},
err => reject(err)
)
});
}
/**
* 新增数据操作
* @param connection 连接
* @param sql SQL语句
* @param val SQL参数
* @param {boolean} skipId 跳过自动添加ID, false: 自动添加id,true: 不添加id
* @returns {Promise} resolve 自动生成的id
*/
function insert (connection, sql, val, skipId) {
let id = val.id;
if (!id && !skipId) {
id = uuid();
val = {id, ...val};
}
return new Promise((resolve, reject) => {
// console.info('sql执行insert操作:\n', sql, '\n', val);
connection.query(sql, val, (err, results) => {
if (err) {
console.error('sql执行失败!', sql, '\n', val);
reject(err);
} else {
resolve(id);
}
});
});
}
/**
* 更新操作
* @param connection 连接
* @param sql SQL语句
* @param val SQL参数
* @returns {Promise} resolve 更新数据的行数
*/
function update (connection, sql, val) {
// console.info('sql执行update操作:\n', sql, '\n', val);
return new Promise((resolve, reject) => {
connection.query(sql, val, (err, results) => {
if (err) {
console.error('sql执行失败!', sql, '\n', val);
reject(err);
} else {
resolve(results.affectedRows);
}
});
});
}
/**
* 删除操作
* @param connection 连接
* @param sql SQL语句
* @param val SQL参数
* @returns {Promise} resolve 删除数据的行数
*/
function del (connection, sql, val) {
// console.info('sql执行delete操作:\n', sql, '\n', val);
return new Promise((resolve, reject) => {
connection.query(sql, val, (err, results) => {
if (err) {
console.error('sql执行失败!', sql, '\n', val);
reject(err);
} else {
// console.log('delete result', results);
resolve(results.affectedRows);
}
});
});
}
将代码分层可以降低代码的耦合度,提高可复用性、可维护性,这里将代码分成了3层: Dao层、Service层和Controller层。
const { query, queryOne, update, insert, del } = require('../db/curd');
class UserDao {
static async queryUserById (connection, id) {
const sql = `SELECT user.id, user.account, user.name, user.email, user.phone,
user.birthday, user.enable, user.deleteFlag, user.creator,
user.createTime, user.updater, user.updateTime
FROM sys_user user
WHERE user.id = ?`;
const user = await queryOne(connection, sql, id);
return user;
}
…
}
module.exports = UserDao;
const { execute, executeTransaction } = require('../db/execute');
const UserDao = require('../dao/userDao');
class UserService {
static async findUserById (id) {
return await execute(UserDao.queryUserById, id);
}
…
}
module.exports = UserService;
对于复杂些的业务逻辑可以使用匿名函数来实现:
static async findUserWithRoles (id) {
return await execute (async connection => {
const user = await UserDao.queryUserById(connection, id);
if (user) {
user.roles = await RoleDao.queryRolesByUserId(connection, id);
}
return user;
});
}
如果要执行事务操作,则需要使用executeTransaction 方法:
static async updateUserRoleRelations (userId, roleIds) {
return await executeTransaction(async connection => {
const relations = await UserDao.queryUserRoleRelations(connection, userId);
const oldRoleIds = relations.map(item => item.roleId);
const newRoleIds = roleIds || [];
// 新增的角色数组
const addList = [];
// 移除的角色数组
const removeList = [];
newRoleIds.forEach(roleId => {
if (oldRoleIds.indexOf(roleId) === -1) {
addList.push(roleId);
}
});
oldRoleIds.forEach(roleId => {
if (newRoleIds.indexOf(roleId) === -1) {
removeList.push(roleId);
}
});
if (addList.length > 0) {
await UserDao.insertUserRoleRelations(connection, userId, addList);
}
if (removeList.length > 0) {
await UserDao.deleteUserRoleRelations(connection, userId, removeList);
}
});
}
const UserService = require('../service/userService');
class UserControler {
static async getUserById (ctx) {
// 用户ID
const id = ctx.params.id;
// 是否包含用户角色信息,如果withRoles 为 "1" 表示需要包含角色信息
const withRoles = ctx.query.withRoles;
let user;
if (withRoles === '1') {
user = await UserService.findUserWithRoles(id);
} else {
user = await UserService.findUserById(id);
}
if (user) {
ctx.body = user;
} else {
ctx.body = {
code: 1004,
msg: '用户不存在!'
}
}
}
…
}
module.exports = UserControler;
此示例基于Koa框架,controller 层实现完成后需要添加路由:
const router = new KoaRouter();
const UserController = require('./controler/userControler');
// 获取指定ID的用户
router.get('/users/:id', UserController.getUserById);
// 获取所有用户
router.get('/users', UserControler.getUsers);
对于Koa框架如何使用,这里不再介绍,路由添加完毕后,启动服务,即可使用这些接口,如果本地服务启动的端口为3000,接口请求地址如下:
本文介绍了mysql模块的基本使用,对其进行了简单封装,并提供了使用示例。除了使用mysql模块来操作数据库,也可以使用mysql2模块,mysql2的基本用法与mysql一致,另外mysql2还支持Promise,使用起来更方便。本文相关的代码已提交到GitHub以供参考,项目地址: https://github.com/liulinsp/node-server-typeorm-demo。
作者:刘琳