MongoDB

一般爬虫使用的数据库,是根据项目来定的。如需求方指定了使用什么数据库、如果没指定,那么决定权就在爬虫程序员手里,如果自选的话,mysql 和mongodb 用的都是比较多的。但不同的数据库品种有各自的优缺点,不同的场景任何一种数据库都可以用来存储,但是某种可能会更好。比如如果抓取的数据之间的耦合性很高,关系比较复杂的话,那么mysql可能会是更好的选择。如果抓取的数据是分版块的,并且它们之间没有相似性或关联性不强,那么可能mongodb 会更好。

官方文档:https://docs.mongodb.com/

中文文档:https://www.mongodb.org.cn/

mongoDB的生态、理念非常先进而且成熟、但是mongoDB不仅有开源版本,还有企业版本。所以有部分公司比较担心,哪天无法使用mongoDB了,所以也会产生一些替代产品。

DynamoDB  : AWS
SequoiaDB : 巨杉数据库

【1】安装

(1)mac安装

在 Mac OS 系统下安装 MongoDB 与在 Linux 下安装比较相似,本节我们就来详细介绍一下 Mac OS 系统下如何安装 MongoDB。

  1. 下载 MongoDB

与在 Linux 系统下安装 MongoDB 相同,首先我们需要在 MongoDB 的官网获得 MongoDB 安装包的下载链接,如下图所示:

MongoDB 官网
图:MongoDB 官网

得到下载链接后,使用cd命令进入 /usr/local 目录,然后使用wget命令下载 MongoDB 的压缩包,命令如下:

cd /usr/local
sudo wget https://fastdl.mongodb.org/osx/mongodb-macos-x86_64-4.4.3.tgz

(2)安装 MongoDB

待压缩包下载完成后就可以尝试安装 MongoDB 了,具体的安装步骤如下:

【步骤 1】解压缩刚刚下载的压缩包,并将其重命名为 mongodb:

sudo tar -zxvf mongodb-macos-x86_64-4.4.3.tgz # 解压 MongoDB 压缩包
sudo mv mongodb-osx-ssl-x86_64-4.0.17/ mongodb # 重命名解压得到的文件夹

【步骤 2】在 /usr/local/mongodb 目录下新建两个文件夹 data 和 log,用于存储 MongoDB 的数据和日志。

sudo mkdir -p /usr/local/mongodb/data
sudo mkdir -p /usr/local/mongodb/log

使用如下命令为当前用户分配目录的读写权限:

sudo chown biancheng /usr/local/mongodb/data
sudo chown biancheng /usr/local/mongodb/log

其中“biancheng”为当前的用户名,您需要修改为您当前系统的用户名。

【步骤 3】配置 PATH。在终端中输入open -e .bash_profile命令打开 bash_profile 配置文件,然后将 MongoDB 的安装目录下的 bin 目录添加到环境变量中,如下所示:

export PATH=${PATH}:/usr/local/mongodb/bin

编辑完成后保存并退出,然后使用source .bash_profile命令使配置立即生效。

【步骤 4】使用下面的命令来启动 MongoDB 服务:

mongod –dbpath /usr/local/mongodb/data –logpath /usr/local/mongodb/log/mongo.log –fork

参数说明如下:

  • –dbpath 用来设置数据的存放目录;
  • –logpath 用来设置日志的存放目录;
  • –fork 用来设置在后台运行。

至此 MongoDB 就安装完成了。

(3)验证安装

您可以使用mongod -version命令来验证 MongoDB 是否安装成功,如果出现类似下面所示的内容,则说明 MongoDB 安装成功。

mongod -version

db version v4.0.10
git version: c389e7f69f637f7a1ac3cc9fae843b635f20b766
allocator: tcmalloc
modules: none
build environment:
distmod: 2008plus-ssl
distarch: x86_64
target_arch: x86_64

(2)win安装

通过前面的介绍我们已经简单的了解了 MongoDB,本节我们来看看如何在 Windows 系统上安装 MongoDB。

下载 MongoDB

要在 Windows 系统上安装 MongoDB,首先需要在 MongoDB 的官网(https://www.mongodb.com/try/download/community)下载 MongoDB 的安装包,如下图所示:

下载 MongoDB 安装包
图:下载 MongoDB 安装包

提示:下载前需要先注册/登陆 MongoDB 官网的账号。

安装 MongoDB

【步骤 1】双击运行我们刚刚下载的 .msi 格式的 MongoDB 安装包,在弹出的窗口种单击 Next,如下图所示:

运行 MongoDB 安装包
图:运行 MongoDB 安装包

【步骤 2】接受用户许可协议,并单击 Next,如下图所示:

接受用户协议
图:接受用户协议

【步骤 3】单击 Custom(自定义)按钮来自定义安装,如下图所示:

自定义安装
图:自定义安装

【步骤 4】修改安装目录,并单击 Next,如下图所示:

自定义安装目录
图:自定义安装目录

【步骤 5】选中“Install MongoD as a Service”,并在下面的选项中选择“Run service as Network Service user”,完成后单击 Next,如下图所示:

安装 Windows 服务
图:安装 Windows 服务

【步骤 6】取消“Install MongoDB Compass”的勾选(当然您也可以选择安装它,但这样就需要花费更久的安装时间),MongoDB Compass 是一个图形界面管理工具,后面如果需要我们也可以再单独下载(https://www.mongodb.com/try/download/compass)和安装它,完成上述操作后单击 Next,如下图所示:

取消“Install MongoDB Compass”的勾选
图:取消“Install MongoDB Compass”的勾选

【步骤 7】单击“Install”按钮开始安装。

开始安装
图:开始安装

【步骤 8】等待安装完成,单击“Finish”按钮退出安装程序即可完成安装。

完成安装
图:完成安装

验证安装

不出意外的话,完成上面的一系列操作后 MongoDB 就成功安装到您的电脑上了。想要验证安装是否成功,您可以打开“服务”,如果能在服务列表中找到 MongoDB Server,就说明 MongoDB 已经安装成功。

MongoDB Server 服务

【2】常用概念

(1)数据库

数据库是用于存储数据的物理容器,每个数据库在文件系统中都有属于自己的文件集。一台 MongoDB 服务器中可以创建多个数据库,并且每个数据库都是独立的,都有属于自己的集合和权限,而且不同数据库中的数据会放置在不同的文件中。

MongoDB 的默认数据库为“test”,该数据库存储在 data 目录中,您可以使用show dbs命令来查看所有的数据库列表,如下所示:

\> show dbs
admin  0.000GB
config 0.000GB
local  0.000GB

注意:在使用show dbs命令时,若数据库中没有存储任何数据,则不会在列表中显示出来,也就是说只有非空数据库才能通过show dbs命令查看。

(2)集合

集合就是一组 MongoDB 文档的组合,类似于关系型数据库(例如 MySQL)中的数据表。集合存在于数据库中,且没有固定的结构,您可以向集合中插入不同格式或类型的数据。

(3)文档

文档是 MongoDB 中数据的基本单位,由 BSON 格式(一种计算机数据交换格式,类似于 JSON)的键/值对组成,类似于关系型数据库中的一行行数据,但要相对复杂一些。

文档具有动态模式,所谓动态模式就是同一集合中的文档不需要具有相同的字段,即使是相同的字段也可以是不同的类型,这与关系型数据库有很大的区别,也是 MongoDB 最突出的特点之一。

下表列举了关系型数据库与 MongoDB 中的一些差异:

| 关系型数据库 | MongoDB | 解释说明 |
| ———— | ———– | —————————————– |
| database | database | 数据库 |
| table | collection | 数据表/集合 |
| row | document | 数据行/文档 |
| column | field | 字段/域 |
| index | index | 索引 |
| table joins | | 表连接,MongoDB 中不支持 |
| primary key | primary key | 主键,MongoDB 会自动将 _id 字段设置为主键 |

【3】数据库操作

> show dbs
admin   0.000GB
config  0.000GB
local   0.000GB
> use blog
switched to db blog
> db
blog
> db.article.insert({"title":"mongodb"})
WriteResult({ "nInserted" : 1 })
> use shop
switched to db shop
> db.goods.insert({"name":"电子商品"})
WriteResult({ "nInserted" : 1 })
> show dbs
admin   0.000GB
blog    0.000GB
config  0.000GB
local   0.000GB
shop    0.000GB
> db.dropDatabase()
{ "dropped" : "shop", "ok" : 1 }

【4】集合操作

MongoDB 的集合就相当于 MySQL 的一个表 table,MySQL 列出的所有表都可以使用 show tables,MongoDB 可以使用 show collections 展示所有集合

> db
shop
> db.createCollection("menu")
{ "ok" : 1 }
> db.menu.insert({"id":"1231231","name":"张三"})
WriteResult({ "nInserted" : 1 })
> db.menu.find()
{ "_id" : ObjectId("6425263f8ae74e12d95c4c7e"), "id" : "1231231", "name" : "张三" }
> db.menu.insert({"id":"1231231","name":"李四","price":199})
WriteResult({ "nInserted" : 1 })
> db.menu.find()
{ "_id" : ObjectId("6425263f8ae74e12d95c4c7e"), "id" : "1231231", "name" : "张三" }
{ "_id" : ObjectId("642526968ae74e12d95c4c7f"), "id" : "1231231", "name" : "李四", "price" : 199 }
> show tables;
menu
> show collections;
menu
> db.menu.drop()
true
> show tables;
> 

【5】文档操作

文档是 MongoDB 中存储的基本单元,是一组有序的键值对集合。文档中存储的文档键的格式必须是符合 UTF-8 标准的字符串。

添加文档

语法

db_name.collection_name.insert(
    <document or array of documents>,
    {
        writeConcern: <document>,    //可选字段
        ordered: <boolean>    //可选字段
    }
)

参数

| 参数 | 描述 |
| —————————— | ———————————————- |
| db_name | 数据库名 |
| collection_name | 集合名 |
| document or array of documents | 表示可设置插入一条或多条文档 |
| writeConcern | 参数表示自定义写出错的级别,是一种出错捕捉机制 |
| ordered | 是可选的,默认为 true |

在插入时,我们既可以指定 _id 的值,如果指定了,则该值必须唯一,如果没有指定,则系统默认生成唯一的值ObjectId。

#插入单条
user1={
    "name":"张三",
    "age":22,
    'hobbies':['music','read','dancing'],
    'addr':{
        'country':'China',
        'city':'BJ'
    }
}

db.user.insert(user1)
db.user.find()

#3、插入多条
user2={
    "name":"李四",
    "age":23,
    'hobbies':['music','read','dancing'],
    'addr':{
        'country':'China',
        'city':'珠海'
    }
}

user3={

    "name":"王五",
    "age":20,
    'hobbies':['music','read'],
    'addr':{
        'country':'China',
        'city':'廊坊'
    }
}

user4={

    "name":"赵六",
    "age":32,
    'hobbies':['read','run'],
    'addr':{
        'country':'China',
        'city':'大连'
    }
}

user5={

    "name":"朱七",
    "age":35,
    'hobbies':['run'],
    'addr':{
        'country':'China',
        'city':'青岛'
    }
}

db.user.insertMany([user2,user3,user4,user5]) 
# db.user.find().pretty()

查看文档

(1) 比较运算 

# SQL:=,!=,>,<,>=,<=
# MongoDB:{key:value}代表什么等于什么,"$ne","$gt","$lt","gte","lte",其中"$ne"能用于所有数据类型

#1、select * from db.user where name = "张三";
db.user.find({'name':'张三'})

#2、select * from db.user where name != "alex";
db.user.find({'name':{"$ne":'张三'}})

#3、select * from db.user where age > 30;
db.user.find({'age':{'$gt':30}})


(2) 逻辑运算 

# SQL:and,or,not
# MongoDB:字典中逗号分隔的多个条件是and关系,"$or"的条件放到[]内,"$not"

db.user.find({'age':{"$gte":20,"$lt":33}})

db.user.find({"addr.city":"青岛","age":{"$gt":30}})

db.user.find({
    "$or":[
        {"addr.city":"青岛"},
        {"age":{"$gt":30}}
        ]
})

#4、select * from db1.user where id % 2=1;
db.user.find({'age':{"$mod":[2,1]}})

#5、上题,取反
db.user.find({'age':{"$not":{"$mod":[2,1]}}})


(3) 成员运算 

# SQL:in,not in
# MongoDB:"$in","$nin"

db.user.find({"age":{"$in":[20,30,40]}})

db.user.find({"name":{"$nin":['张三','']}})


(4) 正则匹配

# SQL: regexp 正则
# MongoDB: /正则表达/i

db.user.find({'name':/^张.*/i})

(5) 取指定字段  

db.user.find({'name':/^张.*/i},{'_id':0,'name':1,'age':1})

(6) 查询数组 

# 查看有dancing爱好的人
db.user.find({'hobbies':'dancing'})

# 查看既有dancing爱好又有music爱好的人
db.user.find({
    'hobbies':{
        "$all":['music','dancing']
        }
})


 (7) 排序 

# 排序:--   1代表升序,  -1代表降序
db.user.find().sort({"name":1,})
db.user.find().sort({"age":-1,'_id':1})

 (8) 分页 

# 分页:--limit代表取多少个document,skip代表跳过前多少个document。
db.user.find().limit(1).skip(2)

 (9) 查询数量 
# 获取数量
db.user.count({'age':{"$gt":30}})

--或者
db.user.find({'age':{"$gt":30}}).count()

 (10) 其它  

# 查找所有
db.user.find() #等同于db.user.find({})
db.user.find().pretty()

#3、查找一个,与find用法一致,只是只取匹配成功的第一个
db.user.findOne({"_id":{"$gt":3}}) 

更新文档

(1) update的语法

update() 方法用于更新已存在的文档。语法格式如下:

db.collection.update(
   <query>,
   <update>,
   {
     upsert: <boolean>,
     multi: <boolean>,
     writeConcern: <document>
   }
)

参数说明:对比update db1.t1 set name='EGON',sex='Male' where name='egon' and age=18;

query : 相当于where条件。
update : update的对象和一些更新的操作符(如$,$inc…等,相当于set后面的
upsert : 可选,默认为false,代表如果不存在update的记录不更新也不插入,设置为true代表插入。
multi : 可选,默认为false,代表只更新找到的第一条记录,设为true,代表更新找到的全部记录。
writeConcern :可选,抛出异常的级别。

更新操作是不可分割的:若两个更新同时发送,先到达服务器的先执行,然后执行另外一个,不会破坏文档。

(1) 覆盖更新 

#注意:除非是删除,否则_id是始终不会变的
#1 :
db.user.update({'age':20},{"name":"xxx"})
是用{"_id":2,"name":"xxx"}覆盖原来的记录

#2、一种最简单的更新就是用一个新的文档完全替换匹配的文档。这适用于大规模式迁移的情况。例如
var obj=db.user.findOne({"name":"张三"})

obj.name=obj.name+'先生'
obj.age++
delete obj.hobbies

db.user.update({"nmae":"张三"},obj)

(2) 局部更新  

#设置:$set

通常文档只会有一部分需要更新。可以使用原子性的更新修改器,指定对文档中的某些字段进行更新。
更新修改器是种特殊的键,用来指定复杂的更新操作,比如修改、增加后者删除

db.user.update({'name':"xxx"},{"$set":{"name":"apple",}})

# 没有匹配成功则新增一条{"upsert":true}
db.user.update({'name':"eric"},{"$set":{"name":"eirc","age":18}},{"upsert":true})

# 默认只改匹配成功的第一条,{"multi":改多条}
db.user.update({'age':{"$gt":20}},{"$set":{"age":18}})
db.user.update({'age':{"$gt":20}},{"$set":{"age":18}},{"multi":true})

# 
db.user.update({'name':"朱七"},{"$set":{"hobbies.1":"swimming"}})

db.user.update({'name':"朱七"},{"$unset":{"hobbies":""}})

(3) 自增或自减  

#增加和减少:$inc

#1、所有人年龄增加一岁
db.user.update({},
    {
        "$inc":{"age":1}
    },
    {
        "multi":true
    }
    )
#2、所有人年龄减少5岁
db.user.update({},
    {
        "$inc":{"age":-5}
    },
    {
        "multi":true
    }
    )


(4) 添加删除数组内元素

#添加删除数组内元素:$push,$pop,$pull

往数组内添加元素:$push
# 为名字为朱七添加一个爱好pingpong
db.user.update({"name":"yuanhao"},{"$push":{"hobbies":"pingpong"}})


# 按照条件删除元素,:"$pull" 把符合条件的统统删掉,而$pop只能从两端删
db.user.update({'addr.country':"China"},{"$pull":{
    "hobbies":"read"}
},
{
    "multi":true
}
)

(5) 避免重复添加

#避免添加重复:"$addToSet"

db.urls.insert({"_id":1,"urls":[]})

db.urls.update({"_id":1},{"$addToSet":{"urls":'http://www.baidu.com'}})
db.urls.update({"_id":1},{"$addToSet":{"urls":'http://www.baidu.com'}})
db.urls.update({"_id":1},{"$addToSet":{"urls":'http://www.baidu.com'}})

db.urls.update({"_id":1},{
    "$addToSet":{
        "urls":{
        "$each":[
            'http://www.baidu.com',
            'http://www.baidu.com',
            'http://www.xxxx.com'
            ]
            }
        }
    }
)

删除文档

#1、删除多个中的第一个
db.user.deleteOne({ 'age': 8 })

#2、删除国家为China的全部
db.user.deleteMany( {'addr.country': 'China'} )

#3、删除全部
db.user.deleteMany({})

【6】PyMongo

在这里我们来看一下Python3下MongoDB的存储操作,在本节开始之前请确保你已经安装好了MongoDB并启动了其服务,另外安装好了Python的PyMongo库。

安装:

pip install pymongo

添加文档

import pymongo

client = pymongo.MongoClient(host='localhost', port=27017)
"""
这样我们就可以创建一个MongoDB的连接对象了。另外MongoClient的第一个参数host还可以直接传MongoDB的连接字符串,以mongodb开头,
例如:client = MongoClient('mongodb://localhost:27017/')可以达到同样的连接效果。
"""
# 指定数据库
# MongoDB中还分为一个个数据库,我们接下来的一步就是指定要操作哪个数据库,在这里我以test数据库为例进行说明,所以下一步我们
# 需要在程序中指定要使用的数据库。

db = client.test
# 调用client的test属性即可返回test数据库,当然也可以这样来指定:
# db = client['test']
#  两种方式是等价的。

# 指定集合
# MongoDB的每个数据库又包含了许多集合Collection,也就类似与关系型数据库中的表,下一步我们需要指定要操作的集合,
# 在这里我们指定一个集合名称为students,学生集合。还是和指定数据库类似,指定集合也有两种方式。

collection = db.students
# collection = db['students']
# 插入数据,接下来我们便可以进行数据插入了,对于students这个Collection,我们新建一条学生数据,以字典的形式表示:

student = {
    'id': '20230001',
    'name': 'yuan',
    'age': 20,
    'gender': 'male'
}
# 在这里我们指定了学生的学号、姓名、年龄和性别,然后接下来直接调用collection的insert()方法即可插入数据。

result = collection.insert_one(student)
print(result)

student1 = {
    'id': '20230002',
    'name': 'rain',
    'age': 24,
    'gender': 'male'
}

student2 = {
    'id': '20230003',
    'name': 'eric',
    'age': 26,
    'gender': 'male'
}

result = collection.insert_many([student1, student2])
print(result)
print(result.inserted_ids)


# insert_many()方法返回的类型是InsertManyResult,调用inserted_ids属性可以获取插入数据的_id列表,运行结果:

# <pymongo.results.InsertManyResult object at 0x101dea558>
# [ObjectId('5932abf415c2607083d3b2ac'), ObjectId('5932abf415c2607083d3b2ad')]

查询文档

import pymongo
from bson.objectid import ObjectId

client = pymongo.MongoClient(host='localhost', port=27017)
db = client.test
collection = db.students

# 查询,插入数据后我们可以利用find_one()或find()方法进行查询,find_one()查询得到是单个结果,find()则返回多个结果。

result = collection.find_one({'name': 'yuan'})
print(type(result))
print(result)
# 在这里我们查询name为yuan的数据,它的返回结果是字典类型,运行结果: <class'dict'>
# {'_id': ObjectId('5932a80115c2606a59e8a049'), 'id': '20170202', 'name': 'Mike', 'age': 21, 'gender': 'male'}
# 可以发现它多了一个_id属性,这就是MongoDB在插入的过程中自动添加的。

# 我们也可以直接根据ObjectId来查询,这里需要使用bson库里面的ObjectId。


result = collection.find_one({'_id': ObjectId('64255f6012345f5a41fb4f56')})
print(result)
# 其查询结果依然是字典类型,运行结果:

# {' ObjectId('593278c115c2602667ec6bae'), 'id': '20170101', 'name': 'Jordan', 'age': 20, 'gender': 'male'}
# 当然如果查询_id':结果不存在则会返回None。

# 对于多条数据的查询,我们可以使用find()方法,例如在这里查找年龄为20的数据,示例如下:

results = collection.find({'age': 20})
print(results, type(result))
for result in results:
    print(result)

# 如果要查询年龄大于20的数据,则写法如下:

results = collection.find({'age': {'$gt': 20}})
# 在这里查询的条件键值已经不是单纯的数字了,而是一个字典,其键名为比较符号$gt,意思是大于,键值为20,这样便可以查询出所有
# 年龄大于20的数据。

# 在这里将比较符号归纳如下表:
"""
符号含义示例
$lt小于{'age': {'$lt': 20}}
$gt大于{'age': {'$gt': 20}}
$lte小于等于{'age': {'$lte': 20}}
$gte大于等于{'age': {'$gte': 20}}
$ne不等于{'age': {'$ne': 20}}
$in在范围内{'age': {'$in': [20, 23]}}
$nin不在范围内{'age': {'$nin': [20, 23]}}
"""
# 另外还可以进行正则匹配查询,例如查询名字以M开头的学生数据,示例如下:

results = collection.find({'name': {'$regex': '^M.*'}})
# 在这里使用了$regex来指定正则匹配,^M.*代表以M开头的正则表达式,这样就可以查询所有符合该正则的结果。
print(results)
# 在这里将一些功能符号再归类如下:
"""
符号含义示例示例含义
$regex匹配正则{'name': {'$regex': '^M.*'}}name以M开头
$exists属性是否存在{'name': {'$exists': True}}name属性存在
$mod数字模操作{'age': {'$mod': [2, 0]}}年龄模2余0
$where高级条件查询{'$where': 'obj.fans_count == obj.follows_count'}自身粉丝数等于关注数
"""
# 这些操作的更详细用法在可以在MongoDB官方文档找到:
# https://docs.mongodb.com/manual/reference/operator/query/

# 计数
# 要统计查询结果有多少条数据,可以调用count()方法,如统计所有数据条数:

count = collection.count_documents({})
print(count)
# 或者统计符合某个条件的数据:

count = collection.count_documents({'age': 20})
print(count)
# 排序
# 可以调用sort方法,传入排序的字段及升降序标志即可,示例如下:

results = collection.find().sort('name', pymongo.ASCENDING)
print([result['name'] for result in results])



# 偏移,可能想只取某几个元素,在这里可以利用skip()方法偏移几个位置,比如偏移2,就忽略前2个元素,得到第三个及以后的元素。

results = collection.find().sort('name', pymongo.ASCENDING).skip(2).limit(2)
print([result['name'] for result in results])

# 值得注意的是,在数据库数量非常庞大的时候,如千万、亿级别,最好不要使用大的偏移量来查询数据,很可能会导致内存溢出,
# 可以使用类似find({'_id': {'$gt': ObjectId('593278c815c2602678bb2b8d')}}) 这样的方法来查询,记录好上次查询的_id。

更新记录

import pymongo

client = pymongo.MongoClient(host='localhost', port=27017)
db = client.test
collection = db.students

# 更新

# condition = {'name': 'yuan'}
# student = collection.find_one(condition)
# student['age'] = 100
# result = collection.update_one(condition, {'$set': student})
# print(result)
# print(result.matched_count, result.modified_count)
# print(collection.find_one({"name": "yuan"}))

# 在这里调用了update_one方法,第二个参数不能再直接传入修改后的字典,而是需要使用{'$set': student}这样的形式,
# 其返回结果是UpdateResult类型,然后调用matched_count和modified_count属性分别可以获得匹配的数据条数和影响的数据条数。

# 运行结果:
#
# <pymongo.results.UpdateResult object at 0x10d17b678>
# 1 0


# 我们再看一个例子:
# print(list(collection.find({"age": {"$gt": 20}})))
# condition = {'age': {'$gt': 20}}
# result = collection.update_one(condition, {'$inc': {'age': 1}})
# print(list(collection.find({"age": {"$gt": 20}})))
# 在这里我们指定查询条件为年龄大于20,然后更新条件为{'$inc': {'age': 1}},执行之后会讲第一条符合条件的数据年龄加1。


# 如果调用update_many()方法,则会将所有符合条件的数据都更新,示例如下:

# condition = {'age': {'$gt': 20}}
# result = collection.update_many(condition, {'$inc': {'age': 1}})
# print(result)
# print(result.matched_count, result.modified_count)
# 这时候匹配条数就不再为1条了,运行结果如下:
#
# <pymongo.results.UpdateResult object at 0x10c6384c8>
# 3 3
# 可以看到这时所有匹配到的数据都会被更新。
# print(list(collection.find({"age": {"$gt": 20}})))

删除文档

import pymongo

client = pymongo.MongoClient(host='localhost', port=27017)
db = client.test
collection = db.students

# 删除

# delete_one()和delete_many()方法,示例如下:

result = collection.delete_one({'name': 'yuan'})
print(result)
print(result.deleted_count)
result = collection.delete_many({'age': {'$lt': 25}})
print(result.deleted_count)

详细用法可以参见官方文档:http://api.mongodb.com/python/current/api/pymongo/collection.html

另外还有对数据库、集合本身以及其他的一些操作,在这不再一一讲解,可以参见

官方文档:http://api.mongodb.com/python/current/api/pymongo/

Flask+mqtt

文档出处:如何在 Flask 项目中使用 MQTT | EMQ (emqx.com)

Flask 是一个使用 Python 编写的轻量级 Web 应用框架,其被称为 “微框架”,因为它使用简单的核心,用扩展增加其他功能,例如:ORM、窗体验证工具、文件上传、各种开放式身份验证技术等。

MQTT 是一种基于发布/订阅模式的 轻量级物联网消息传输协议 ,可以用极少的代码和带宽为联网设备提供实时可靠的消息服务,它广泛应用于物联网、移动互联网、智能硬件、车联网、电力能源等行业。

本文主要介绍如何在 Flask 项目中实现 MQTT 客户端MQTT 服务器的连接、订阅、取消订阅、收发消息等功能。

我们将使用到 Flask-MQTT 客户端库 ,它是一个 Flask 扩展,可以看作一个 paho-mqtt 的装饰器,用于简化 Flask 应用程序中的 MQTT 集成

项目初始化

pip3 install flask-mqtt

导入 Flask-MQTT

导入 Flask 库以及 Flask-MQTT 扩展,并创建 Flask 应用

from flask import Flask, request, jsonify
from flask_mqtt import Mqtt

app = Flask(__name__)

配置 Flask-MQTT 扩展

app.config['MQTT_BROKER_URL'] = 'broker.emqx.io'
app.config['MQTT_BROKER_PORT'] = 1883
app.config['MQTT_USERNAME'] = ''  # 当你需要验证用户名和密码时,请设置该项
app.config['MQTT_PASSWORD'] = ''  # 当你需要验证用户名和密码时,请设置该项
app.config['MQTT_KEEPALIVE'] = 5  # 设置心跳时间,单位为秒
app.config['MQTT_TLS_ENABLED'] = False  # 如果你的服务器支持 TLS,请设置为 True
topic = '/flask/mqtt'

mqtt_client = Mqtt(app)

编写连接回调函数

可以在该回调函数中对 MQTT 连接成功或失败的情况进行处理,本示例将在连接成功后订阅 /flask/mqtt 主题。

@mqtt_client.on_connect()
def handle_connect(client, userdata, flags, rc):
   if rc == 0:
       print('Connected successfully')
       mqtt_client.subscribe(topic) # 订阅主题
   else:
       print('Bad connection. Code:', rc)

编写消息回调函数

@mqtt_client.on_message()
def handle_mqtt_message(client, userdata, message):
   data = dict(
       topic=message.topic,
       payload=message.payload.decode()
  )
   print('Received message on topic: {topic} with payload: {payload}'.format(**data))

创建发布消息接口

我们创建一个简单的 POST 接口实现 MQTT 消息发布。

在实际应用中该接口可能需要进行一些更复杂的业务逻辑处理

@app.route('/publish', methods=['POST'])
def publish_message():
   request_data = request.get_json()
   publish_result = mqtt_client.publish(request_data['topic'], request_data['msg'])
   return jsonify({'code': publish_result[0]})

运行 Flask 应用

if __name__ == '__main__':
   app.run(host='127.0.0.1', port=5000)

当 Flask 应用启动后,MQTT 客户端将会连接到服务器,并且订阅主题 /flask/mqtt

测试

接下来我们使用 MQTT 客户端 - MQTTX 进行连接、订阅、发布测试。

测试消息接收

  1. 在 MQTTX 中创建链接并连接到服务器。

image-20240520090354905

  1. 在 MQTTX 中向 /flask/mqtt 主题发布消息 Hello from MQTTX

image-20240520090456465

  1. 在 Flask 运行窗口中将能看到 MQTTX 发送的消息

image-20240520090519238

测试消息发布接口

  1. 在 MQTTX 中订阅 /flask/mqtt_pub 主题。

image-20240520091048693

  1. 使用 Postman 调用 /publish 接口:发送消息 I am msg/flask/mqtt_pub 主题。

image-20240520091020941

  1. 在 MQTTX 中将能看到 Flask 发送过来的消息。

image-20240520091134871

完整代码+Mongodb存储

import json

from flask import Flask, request, jsonify
from flask_mqtt import Mqtt
from pymongo import MongoClient

# mongodb://localhost/ 数据库连接地址
client_db = MongoClient("mongodb://localhost/")
# 连接到mongodb数据库
db = client_db.mqttDb
# 创建集合
data_collection = db.mqttDbTest

app = Flask(__name__)

app.config['MQTT_BROKER_URL'] = '43.138.68.173'  # mqtt服务器地址
app.config['MQTT_BROKER_PORT'] = 1883  # mqtt服务器端口号
app.config['MQTT_USERNAME'] = 'admin'  # 当你需要验证用户名和密码时,请设置该项
app.config['MQTT_PASSWORD'] = 'public'  # 当你需要验证用户名和密码时,请设置该项
app.config['MQTT_KEEPALIVE'] = 5  # 设置心跳时间,单位为秒
app.config['MQTT_TLS_ENABLED'] = False  # 如果你的服务器支持 TLS,请设置为 True
subtopic = '/flask/mqtt'  # 订阅主题

mqtt_client = Mqtt(app)


# 连接mqtt
@mqtt_client.on_connect()
def handle_connect(client, userdata, flags, rc):
    if rc == 0:
        print('Connected successfully')
        mqtt_client.subscribe(subtopic)
    else:
        print('Bad connection. Code:', rc)


# 监听收到的消息
@mqtt_client.on_message()
def handle_mqtt_message(client, userdata, message):
    data = dict(
        topic=message.topic,
        payload=message.payload.decode()
    )
    res = json.loads(data.get('payload'))
    print(res)
    data_collection.insert_one(res)
    print('插入成功')
    print('Received message on topic: {topic} with payload: {payload}'.format(**data))


# 发布消息
@app.route('/publish', methods=['POST'])
def publish_message():
    request_data = request.get_json()
    publish_result = mqtt_client.publish(request_data['topic'], request_data['msg'])
    return jsonify({'code': publish_result[0]})


if __name__ == '__main__':
    app.run(host='127.0.0.1', port=5000)