Python3 中 Pymysql 的易用封装

如题

安装 pymysql
1
pip install pymysql

执行 excute

1
2
3
4
5
6
7
8
9
def execute(self, sql_cmd):
try:
self._getCursor().execute(sql_cmd)
self._conn.commit()
data = self._getCursor().fetchall()
return data
except Exception:
self._conn.rollback()
return None

插入 Insert

1
2
3
4
5
6
7
8
9
10
11
12
def insert(self, table, ins_data):
sql = ''
for data in ins_data:
key = ','.join(data.keys())
values = map(self._deal_values, data.values())
ins_data = ', '.join(values)
sql = "INSERT INTO {table}({key}) VALUES ({val})".format(
table=table,
key=key,
val=ins_data
)
return self.execute(sql)

删除 Delete

1
2
3
4
5
6
7
8
9
def delete(self, table, condition={}):
condition_list = self._deal_values(condition)
condition_data = ' and '.join(condition_list)
sql = "DELETE FROM {table} {where} {condition}".format(
table=table,
where='' if condition_data == '' else 'WHERE',
condition=condition_data
)
return self.execute(sql)

查询 Select

条件查询

1
2
3
def select(self, table, filed, value):
sql = "SELECT * FROM {table} WHERE {filed} = '{value}'".format(table=table, filed=filed, value=value)
return self.execute(sql)

查询全部

1
2
3
def select_all(self, table):
sql = "SELECT * FROM {table}".format(table=table)
return self.execute(sql)

修改 Update

1
2
3
4
5
6
7
8
9
10
def update(self, table, data, condition=None):
update_list = self._deal_values(data)
update_data = ",".join(update_list)
if condition is not None:
condition_list = self._deal_values(condition)
condition_data = ' AND '.join(condition_list)
sql = "UPDATE {table} SET {values} WHERE {condition}".format(table=table, values=update_data, condition=condition_data)
else:
sql = "UPDATE {table} SET {values}".format(table=table, values=update_data)
return self.execute(sql)

SQLUtils 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import pymysql

class SQLUtils:
_conn = None
_cursor = None

def __init__(self, db_host=_config['SQL']['host'], db_user=_config['SQL']['user'], db_password=_config['SQL']['password'], db_name=_config['SQL']['database']):
try:
self._conn = pymysql.connect(db_host, db_user, db_password, db_name)
self._cursor = self._conn.cursor()
if self._conn is None or self._cursor is None:
raise Exception('Error while connect to database.')
except Exception as e:
raise e

def __del__(self):
self._cursor.close()
self._cursor = None
self._conn.close()
self._conn = None

def _getConnection(self):
if self._conn is not None:
return self._conn

def _getCursor(self):
if self._cursor is not None:
return self._cursor

def _deal_values(self, value):
if isinstance(value, str):
value = ("'{value}'".format(value=value))
elif isinstance(value, dict):
result = []
for key, value in value.items():
value = self._deal_values(value)
res = "{key}={value}".format(key=key, value=value)
result.append(res)
return result
else:
value = (str(value))
return value

def execute(self, sql_cmd):
try:
self._getCursor().execute(sql_cmd)
self._conn.commit()
data = self._getCursor().fetchall()
return data
except Exception:
self._conn.rollback()
return None

def insert(self, table, ins_data):
sql = ''
for data in ins_data:
key = ','.join(data.keys())
values = map(self._deal_values, data.values())
ins_data = ', '.join(values)
sql = "INSERT INTO {table}({key}) VALUES ({val})".format(
table=table,
key=key,
val=ins_data
)
return self.execute(sql)

def update(self, table, data, condition=None):
update_list = self._deal_values(data)
update_data = ",".join(update_list)
if condition is not None:
condition_list = self._deal_values(condition)
condition_data = ' AND '.join(condition_list)
sql = "UPDATE {table} SET {values} WHERE {condition}".format(table=table, values=update_data, condition=condition_data)
else:
sql = "UPDATE {table} SET {values}".format(table=table, values=update_data)
return self.execute(sql)

def select(self, table, filed, value):
sql = "SELECT * FROM {table} WHERE {filed} = '{value}'".format(table=table, filed=filed, value=value)
return self.execute(sql)

def select_all(self, table):
sql = "SELECT * FROM {table}".format(table=table)
return self.execute(sql)

def delete(self, table, condition={}):
condition_list = self._deal_values(condition)
condition_data = ' AND '.join(condition_list)
sql = "DELETE FROM {table} {where} {condition}".format(
table=table,
where='' if condition_data == '' else 'WHERE',
condition=condition_data
)
return self.execute(sql)

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×