support upload file form browser
This commit is contained in:
parent
3e15c7229c
commit
5f3e2edb01
|
@ -15,6 +15,11 @@ from app_configuration import app
|
|||
from app_plot import *
|
||||
|
||||
from history_data import *
|
||||
import random
|
||||
import base64
|
||||
import time
|
||||
from os.path import exists
|
||||
from os import makedirs
|
||||
import global_var
|
||||
|
||||
|
||||
|
@ -217,40 +222,69 @@ def app_callback_function():
|
|||
|
||||
|
||||
|
||||
# 上传文件回调
|
||||
@app.callback(
|
||||
dash.dependencies.Output('auto_find_text_flag', 'value'),
|
||||
[
|
||||
dash.dependencies.Input('dcc_upload_file', 'contents')
|
||||
]
|
||||
)
|
||||
def update(contents):
|
||||
|
||||
if contents is not None:
|
||||
|
||||
# 接收base64编码的数据
|
||||
content_type, content_string = contents.split(',')
|
||||
|
||||
# 将客户端上传的文件进行base64解码
|
||||
decoded = base64.b64decode(content_string)
|
||||
|
||||
# 为客户端上传的文件添加后缀,防止文件重复覆盖
|
||||
# 以下方式确保文件名不重复
|
||||
suffix = [str(random.randint(0,100)) for i in range(10)]
|
||||
suffix = "".join(suffix)
|
||||
suffix = suffix + str(int(time.time()))
|
||||
|
||||
# 最终的文件名
|
||||
file_name = 'History_' + suffix
|
||||
# print(file_name)
|
||||
|
||||
# 创建存放文件的目录
|
||||
if (not (exists('data'))):
|
||||
makedirs('data')
|
||||
|
||||
# 欲写入的文件路径
|
||||
path = 'data' + '/' + file_name
|
||||
|
||||
# 写入本地磁盘文件
|
||||
with open(file=path, mode='wb+') as f:
|
||||
f.write(decoded)
|
||||
|
||||
|
||||
# 使用sqlite读取本地磁盘文件
|
||||
# 获取历史记录数据,跨文件全局变量
|
||||
history_data = get_history_data(path)
|
||||
global_var.set_value('history_data', history_data)
|
||||
# for i in history_data:
|
||||
# print(i)
|
||||
|
||||
# 获取搜索关键词数据,跨文件全局变量
|
||||
search_word = get_search_word(path)
|
||||
global_var.set_value('search_word', search_word)
|
||||
# for i in search_word:
|
||||
# print(i)
|
||||
|
||||
# 判断读取到的数据是否正确
|
||||
if (history_data != 'error'):
|
||||
# 找到
|
||||
date_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
|
||||
print('新接收到一条客户端的数据, 数据正确, 时间:{}'.format(date_time))
|
||||
return 1
|
||||
else:
|
||||
# 没找到
|
||||
date_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
|
||||
print('新接收到一条客户端的数据, 数据错误, 时间:{}'.format(date_time))
|
||||
return 0
|
||||
|
||||
return 0
|
||||
|
||||
#
|
||||
# # 判断是否自动寻找到历史记录文件
|
||||
# @app.callback(
|
||||
# dash.dependencies.Output('auto_find_text_flag', 'value'),
|
||||
# [
|
||||
# dash.dependencies.Input('first_load_web_page', 'value')
|
||||
# ]
|
||||
# )
|
||||
# def update(value):
|
||||
#
|
||||
# # value对象不为null
|
||||
# if (value is not None):
|
||||
#
|
||||
# # 获取历史记录数据,跨文件全局变量
|
||||
# history_data = get_history_data()
|
||||
# global_var.set_value('history_data', history_data)
|
||||
# # for i in history_data:
|
||||
# # print(i)
|
||||
#
|
||||
# # 获取搜索关键词数据,跨文件全局变量
|
||||
# search_word = get_search_word()
|
||||
# global_var.set_value('search_word', search_word)
|
||||
# # for i in search_word:
|
||||
# # print(i)
|
||||
#
|
||||
#
|
||||
# if (history_data != 'error'):
|
||||
# # 找到
|
||||
# return 1
|
||||
# else:
|
||||
# # 没找到
|
||||
# return 0
|
||||
# else:
|
||||
# return 0
|
|
@ -9,46 +9,9 @@
|
|||
@mail: sqzhang77@gmail.com
|
||||
"""
|
||||
|
||||
from platform import system
|
||||
import os
|
||||
import shutil
|
||||
import sqlite3
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# 获取历史记录文件所在的路径
|
||||
def get_history_file_path():
|
||||
|
||||
# 获取操作系统信息
|
||||
sys_str = system()
|
||||
|
||||
# 历史记录文件所在的路径
|
||||
file_path = ""
|
||||
|
||||
if('Windows' in sys_str):
|
||||
# print('Windows')
|
||||
home = os.environ['HOMEPATH']
|
||||
file_path = home + "\\AppData\\Local\\Google\\Chrome\\User Data\\Default\\History"
|
||||
elif('Darwin' in sys_str):
|
||||
# print('macOSX')
|
||||
home = os.environ['HOME']
|
||||
file_path = home + "/Library/Application Support/Google/Chrome/Default/History"
|
||||
elif('Linux' in sys_str):
|
||||
# print('Linux')
|
||||
home = os.environ['HOME']
|
||||
file_path = ""
|
||||
else:
|
||||
# print('unknown')
|
||||
file_path = ""
|
||||
|
||||
print('历史记录文件路径为: {}'.format(file_path))
|
||||
return file_path
|
||||
|
||||
|
||||
|
||||
|
||||
# 查询数据库内容
|
||||
def query_sqlite_db(history_db, query):
|
||||
|
||||
|
@ -77,28 +40,14 @@ def query_sqlite_db(history_db, query):
|
|||
|
||||
|
||||
# 获取排序后的历史数据
|
||||
def get_history_data():
|
||||
# 获取历史记录文件所在的路径
|
||||
# 根据不同操作系统自动获取对应的历史记录文件
|
||||
history_file_path = get_history_file_path()
|
||||
def get_history_data(history_file_path):
|
||||
|
||||
# 判断该历史文件是否存在
|
||||
# 若不存在,则需要手动将历史记录文件复制到当前目录下
|
||||
if not os.path.exists(history_file_path):
|
||||
print('历史记录文件不存在!')
|
||||
return 'error'
|
||||
|
||||
|
||||
# 复制(拷贝)历史记录文件到当前目录下
|
||||
# 因为历史记录其实是一个sqlite数据库文件,sqlite是单线程,浏览器在运行时,数据库是默认锁住的
|
||||
# 注意,History是一个文件,没有后缀名。它不是一个目录。
|
||||
try:
|
||||
shutil.copyfile(history_file_path, './History')
|
||||
|
||||
# 获取数据库内容
|
||||
# 数据格式为元组(tuple)
|
||||
select_statement = "SELECT urls.id, urls.url, urls.title, urls.last_visit_time, urls.visit_count, visits.visit_time, visits.from_visit, visits.transition, visits.visit_duration FROM urls, visits WHERE urls.id = visits.url;"
|
||||
result = query_sqlite_db('./History', select_statement)
|
||||
result = query_sqlite_db(history_file_path, select_statement)
|
||||
|
||||
# 将结果按第1个元素进行排序
|
||||
# sort和sorted内建函数会优先排序第1个元素,然后再排序第2个元素,依此类推
|
||||
|
@ -107,34 +56,20 @@ def get_history_data():
|
|||
# 返回排序后的数据
|
||||
return result_sort
|
||||
except:
|
||||
print('复制历史文件出错!')
|
||||
# print('读取出错!')
|
||||
return 'error'
|
||||
|
||||
|
||||
|
||||
# 获取 搜索关键词 数据
|
||||
def get_search_word():
|
||||
# 获取历史记录文件所在的路径
|
||||
# 根据不同操作系统自动获取对应的历史记录文件
|
||||
history_file_path = get_history_file_path()
|
||||
def get_search_word(history_file_path):
|
||||
|
||||
# 判断该历史文件是否存在
|
||||
# 若不存在,则需要手动将历史记录文件复制到当前目录下
|
||||
if not os.path.exists(history_file_path):
|
||||
print('历史记录文件不存在!')
|
||||
return 'error'
|
||||
|
||||
|
||||
# 复制(拷贝)历史记录文件到当前目录下
|
||||
# 因为历史记录其实是一个sqlite数据库文件,sqlite是单线程,浏览器在运行时,数据库是默认锁住的
|
||||
# 注意,History是一个文件,没有后缀名。它不是一个目录。
|
||||
try:
|
||||
shutil.copyfile(history_file_path, './History_Search')
|
||||
|
||||
# 获取数据库内容
|
||||
# 数据格式为元组(tuple)
|
||||
select_statement = "SELECT keyword_search_terms.url_id, keyword_search_terms.term, urls.url, urls.last_visit_time from keyword_search_terms LEFT JOIN urls on keyword_search_terms.url_id=urls.id;"
|
||||
result = query_sqlite_db('./History_Search', select_statement)
|
||||
result = query_sqlite_db(history_file_path, select_statement)
|
||||
|
||||
# 将结果按第1个元素(下标为0)进行升序排序
|
||||
result_sort = sorted(result, key=lambda x: (x[0]))
|
||||
|
@ -142,5 +77,5 @@ def get_search_word():
|
|||
# 返回排序后的数据
|
||||
return result_sort
|
||||
except:
|
||||
print('复制历史文件出错!')
|
||||
# print('读取出错!')
|
||||
return 'error'
|
Loading…
Reference in New Issue