#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# @Time : 2018/4/13 9:20
# @Author : Hannes
# @File : local_to_es.py
import re
from app.transport.table_conf import TransportConf
from base.func_base import date_func
from base.func_base.base_func import *
from base.func_base.file import text_func
from query.util.es_func import NodeEnum, OpEnum
from query.view_base.stream_view import StreamView
from tmp import WEIXIN_FOLDER, DAOCHU_FOLDER
from typing import List
import datetime
import xlrd
from xlrd import Book, xldate_as_tuple
from xlrd.sheet import Sheet, Cell, XL_CELL_DATE
from xlwt import Worksheet
from base.define import *
level_node_dict = {
"ods": NodeEnum.LOCAL,
"dw": NodeEnum.LOCAL,
"dm": NodeEnum.LOCAL,
"new_es": NodeEnum.NODE_172
}
def local_to_view(conf_body: TransportConf) -> StreamView:
if conf_body.sheet:
return excel_to_view(conf_body)
else:
return txt_to_view(conf_body)
def txt_to_view(conf_body: TransportConf) -> StreamView:
stream_view = StreamView(text_func.read_lines(conf_body.txt["path"]))
def f(s: str):
return json.loads(s)
return conf.view_transforming(stream_view.map(f))
def excel_to_view_project(conf_body: TransportConf) -> StreamView:
from base.func_base.file import excel_func
sheets = excel_func.read_excel_sheets(conf_body.sheet["path"])
sheet_ll = excel_func.get_sheet_ll(sheets[conf_body.sheet["index"]])
head = get_head(sheet_ll[0])
return None;
def excel_to_view(conf_body: TransportConf) -> StreamView:
from base.func_base.file import excel_func
sheets = excel_func.read_excel_sheets(conf_body.sheet["path"])
sheet_ll = excel_func.get_sheet_ll(sheets[conf_body.sheet["index"]])
head = get_head(sheet_ll[0])
conf_body.fields_check(head)
stream_view = StreamView(sheet_ll[1:])
def value_or_none(v):
if isinstance(v, str):
v = v.strip()
return v if not is_void(v) else None
else:
return v
def fn(li: list):
limit = len(li) if len(li) <= len(head) else len(head)
return {
head[i]: value_or_none(li[i])
for i in range(limit)
}
stream_view = stream_view.map(fn)
return conf.view_transforming(stream_view)
@list_arg_strip
def get_head(li: list):
return [x for x in li]
@list_arg_strip
def handle_one_line(li: list, head: list, field_func):
d = {}
max_length = len(li) if len(li) < len(head) else len(head)
for j in range(max_length):
if head[j] in field_func:
d[head[j]] = field_func.get(head[j], void_return)(li[j])
else:
d[head[j]] = li[j]
if d[head[j]] is None or d[head[j]] == "":
del d[head[j]]
return d
def get_weight_json(d: dict, info_md5, update_time, source_id):
weight_json = []
for k in d:
weight_json.append({
"update_time": update_time,
"field": k,
"info_md5": info_md5,
"source_id": source_id,
"add_time": update_time
})
return weight_json
def split_char_exchange(s: str):
return s.replace("、", "\t")
def format_time(input_ori: str):
ipt = str(int(float(input_ori)))
if ipt is None or ipt.strip() == "":
return None
return ipt[0:4] + "-" + ipt[4:6] + "-" + ipt[6:]
def province(value: str):
if not is_void(value) and value.endswith("省"):
return value[:-1]
return value
def city(value: str):
if not is_void(value):
if value.endswith("市"):
return value[:-1]
elif value == "恩施土家族苗族自治州":
return "恩施州"
return value
province_map = {
"内蒙古自治区": "内蒙古",
"新疆维吾尔自治区": "新疆",
"广西壮族自治区": "广西",
"宁夏回族自治区": "宁夏",
"西藏自治区": "西藏",
"北京市": "北京",
"上海市": "上海",
"天津市": "天津",
"重庆市": "重庆"
}
def ymd(ymdhms_str: str):
if is_void(ymdhms_str):
return None
datetime = date_func.str_ymdhms_to_datetime(ymdhms_str)
return date_func.datetime_to_str_ymd(datetime)
def province_handle(address_province: str):
if is_void(address_province):
return None
if address_province.endswith("省"):
address_province = address_province.rstrip("省")
if address_province in province_map:
address_province = province_map[address_province]
return address_province
def date_append(s: str):
if is_void(s):
return s
return s + "-01-01"
def comma_to_tab(input: str):
if is_void(input):
return None
li = [item.strip() for item in input.split("、")]
return "\t".join(li)
def used_name_to_nested(s: str):
if is_void(s):
return None
split = re.split("[;;\t]", s.strip())
return [
{
"name": x
} for x in split
]
def institution_list_to_nested(s: str):
if is_void(s):
return None
split = re.split("、", s.strip())
return [
{
"name": x
} for x in split
]
def level_nested_to_nested(s: str):
if is_void(s):
return None
split = re.split("[;;\t]", s.strip())
return [
{
"level": x
} for x in split
]
def other_name_to_nested(s: str):
if is_void(s):
return None
li = []
for item in s.split("、"):
li.append({
"name": item.strip()
})
return li
def medical_nested(s: str):
if is_void(s):
return None
import re
split = re.split(",", s.strip())
li = []
for x in split:
d = {}
for i, item in enumerate(iterable=x.split(">"), start=1):
d[f"label_{i}"] = item
li.append(d)
return li
def strip(s: str):
if is_void(s):
return None
return s.strip()
def point_to_minus(s: str):
if is_void(s):
return None
return s.replace(".", "-")
def major_func(s: str):
if is_void(s):
return None
else:
return {
"total_industrial_output_value_above_scale": str(int(s))
}
def ismedical_1090_func(s: str):
if is_void(s):
return None
if "是" == s:
return "1"
else:
return "0"
def time_market_1090_func(s: str):
if is_void(s):
return None
s = s.strip()
if s == "--":
return None
return s
def loads(s: str):
if is_void(s):
return None
return json.loads(s)
def dm_map_medical_institution_func(tp: tuple):
_id = tp[0]
d: dict = tp[1]
address_province = d.get("address_province")
if is_void(address_province):
address_province = ""
name = d.get("name")
if is_void(name):
name = ""
d["medical_id"] = md5_str(name + address_province)
return d["medical_id"], d
def dm_map_physical_organization_func(tp: tuple):
_id = tp[0]
d: dict = tp[1]
d["org_id"] = md5_str(d["name"])
return md5_str(d["org_id"]), d
def dm_map_social_organization_func(tp: tuple):
_id = tp[0]
d: dict = tp[1]
org_name = d.get("org_name")
if is_void(org_name):
org_name = ""
d["org_id"] = md5_str(org_name)
return md5_str(d["org_id"]), d
def dm_map_major_tax_illegal_func(tp: tuple):
_id = tp[0]
d: dict = tp[1]
case_id = d.get("case_id")
if is_void(case_id):
case_id = ""
taxpayer_name = d.get("taxpayer_name")
if is_void(taxpayer_name):
taxpayer_name = ""
d["id"] = md5_str(case_id + taxpayer_name)
return md5_str(d["id"]), d
def dm_map_industrial_park(tp: tuple):
_id = tp[0]
d: dict = tp[1]
name = d.get("name")
if is_void(name):
name = ""
address_province = d.get("address_province")
if is_void(address_province):
address_province = ""
d["id"] = md5_str(name + address_province)
return md5_str(d["id"]), d
def dm_map_research_institution_func(tp: tuple):
pk_li = [
"address",
"name",
"person_liable",
"type"
]
d: dict = tp[1]
pv_li = []
for pk in pk_li:
pv = d.get(pk)
if pv:
pv_li.append(pv)
institution_id = md5_list(pv_li)
d["institution_id"] = institution_id
return md5_str(institution_id), d
def tab_str_replece(s: str):
if is_void(s):
return None
return "\t".join([x.strip() for x in s.split("\\t")])
def park_func(tp: tuple):
_id, d = tp
return d["park_id"], d
def research_institution_func(tp: tuple):
_id, d = tp
return d["institution_id"], d
def cydn_project_func(tp: tuple):
_id, d = tp
# d["id"] = _id
return _id, d
def dm_cydn_professional_platform_infos_func(tp: tuple):
_id, d = tp
_id = md5_str(d["map_id"] + d["source_doc_id"])
d["id"] = _id
return _id, d
def quiz_ask_func(s: str):
if is_void(s):
return None
return [
{
"label": x.strip()
}
for x in re.split("[,.]", str(s))
]
def dm_map_flightinspection_info_func(tp: tuple):
d: dict = tp[1]
check_date = d.get("check_date")
company_name = d.get("company_name")
if is_void(check_date):
check_date = ""
if is_void(company_name):
company_name = ""
record_id = md5_str(check_date + company_name)
d["record_id"] = record_id
return md5_str(record_id), d
def label_level(v: str):
__dict = {
"一级": 0,
"二级": 1,
"三级": 2,
"四级": 3,
"五级": 4,
}
return __dict.get(v)
def dm_map_com_label_keyword_query_func(tp: tuple):
_label_level = tp[1].get("label_level")
if is_void(_label_level):
return None
return tp
def _tmp_1(tp:tuple):
return None
def __tmp(tp: tuple):
_id, d = tp
# d['people_id'] = md5_str((d['people_name'] or "") + (d['position'] or ""))
# new_d = {
#
# k: v.replace("|","\t") if isinstance(v,str) else v
#
# for k, v in d.items()
#
# }
# _id = d['patent_id']
_id = d['medical_id']
return (_id,d)
def _filter_value(tp:tuple):
_id, d = tp
# new_d = {
# k:v
# for k ,v in d.items if v is not None
# }
_id = d['medical_id']
return (_id,d)
if __name__ == '__main__':
config = {
"table": "dm_map_literature_1",
"need_weight_json": False,
"field_func": {
# "major_economic_industry_indicators": major_func
# "time_market_1090": time_market_1090_func
# "detail_author_nested": loads,
# "author": tab_str_replece,
# "institute": tab_str_replece,
# "person": loads
# "other_name": other_name_to_nested,
# "medical_label": loads,
# "quiz_ask": quiz_ask_func,
# "interventions": quiz_ask_func,
# "complication": quiz_ask_func,
# "symptom": quiz_ask_func,
# "drug_name": quiz_ask_func
# "author": tab_str_replece,
# "institute": tab_str_replece,
# "relation_infos": loads,
# "influence_factors_nested": loads,
# "influence_factors_nested": loads,
# "main_supplier": loads,
# "label_level": label_level
},
# Callable[[Tuple[_id, k-v_dict]], Tuple[_id, k-v_dict]]
"final_func": None,
"sheet": {
"path": r"E:\公司\数据表\1688无年份.xlsx",
"index": 0,
},
# "txt": {
# "path": f"{DAOCHU_FOLDER}/dm_map_social_organization.txt",
# "index": 0,
# },
# "pkey_li": [
# "sub_name",
# "name"
# ]
}
conf = TransportConf(config)
view = local_to_view(conf)
SYS_dm_update_time = date_func.current_ymdhms()
def add_time(tp):
_id, d = tp
d["SYS_dm_update_time"] = SYS_dm_update_time
return _id, d
view = view.map(add_time)
# 如果传到两个es
view.cache()
# view.write_to_es(index=conf.table, node_enum=NodeEnum.LOCAL)
# view.write_to_es(index=conf.table, node_enum=NodeEnum.NODE_172)
# view.write_to_es(index=conf.table, node_enum=NodeEnum.TEST)
打赏作者