菜单

Python:将excel表单信息在es数据库新建表

2019-05-06 - 代码整理
#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
# @Time    : 2018/4/13 9:20
# @Author  : Hannes
# @File    : local_to_es.py
import re

from app.transport.table_conf import TransportConf
from base.func_base import date_func
from base.func_base.base_func import *
from base.func_base.file import text_func
from query.util.es_func import NodeEnum, OpEnum
from query.view_base.stream_view import StreamView
from tmp import WEIXIN_FOLDER, DAOCHU_FOLDER

from typing import List

import datetime
import xlrd
from xlrd import Book, xldate_as_tuple
from xlrd.sheet import Sheet, Cell, XL_CELL_DATE
from xlwt import Worksheet

from base.define import *

level_node_dict = {
    "ods": NodeEnum.LOCAL,
    "dw": NodeEnum.LOCAL,
    "dm": NodeEnum.LOCAL,
    "new_es": NodeEnum.NODE_172
}


def local_to_view(conf_body: TransportConf) -> StreamView:
    if conf_body.sheet:
        return excel_to_view(conf_body)
    else:
        return txt_to_view(conf_body)


def txt_to_view(conf_body: TransportConf) -> StreamView:
    stream_view = StreamView(text_func.read_lines(conf_body.txt["path"]))

    def f(s: str):
        return json.loads(s)

    return conf.view_transforming(stream_view.map(f))


def excel_to_view_project(conf_body: TransportConf) -> StreamView:
    from base.func_base.file import excel_func
    sheets = excel_func.read_excel_sheets(conf_body.sheet["path"])
    sheet_ll = excel_func.get_sheet_ll(sheets[conf_body.sheet["index"]])

    head = get_head(sheet_ll[0])

    return None;


def excel_to_view(conf_body: TransportConf) -> StreamView:
    from base.func_base.file import excel_func
    sheets = excel_func.read_excel_sheets(conf_body.sheet["path"])
    sheet_ll = excel_func.get_sheet_ll(sheets[conf_body.sheet["index"]])

    head = get_head(sheet_ll[0])
    conf_body.fields_check(head)

    stream_view = StreamView(sheet_ll[1:])

    def value_or_none(v):
        if isinstance(v, str):
            v = v.strip()
            return v if not is_void(v) else None
        else:
            return v

    def fn(li: list):
        limit = len(li) if len(li) <= len(head) else len(head)

        return {
            head[i]: value_or_none(li[i])
            for i in range(limit)
        }

    stream_view = stream_view.map(fn)

    return conf.view_transforming(stream_view)


@list_arg_strip
def get_head(li: list):
    return [x for x in li]


@list_arg_strip
def handle_one_line(li: list, head: list, field_func):
    d = {}

    max_length = len(li) if len(li) < len(head) else len(head)

    for j in range(max_length):

        if head[j] in field_func:
            d[head[j]] = field_func.get(head[j], void_return)(li[j])
        else:
            d[head[j]] = li[j]

        if d[head[j]] is None or d[head[j]] == "":
            del d[head[j]]

    return d


def get_weight_json(d: dict, info_md5, update_time, source_id):
    weight_json = []
    for k in d:
        weight_json.append({
            "update_time": update_time,
            "field": k,
            "info_md5": info_md5,
            "source_id": source_id,
            "add_time": update_time
        })

    return weight_json


def split_char_exchange(s: str):
    return s.replace("、", "\t")


def format_time(input_ori: str):
    ipt = str(int(float(input_ori)))
    if ipt is None or ipt.strip() == "":
        return None
    return ipt[0:4] + "-" + ipt[4:6] + "-" + ipt[6:]


def province(value: str):
    if not is_void(value) and value.endswith("省"):
        return value[:-1]
    return value


def city(value: str):
    if not is_void(value):
        if value.endswith("市"):
            return value[:-1]
        elif value == "恩施土家族苗族自治州":
            return "恩施州"

    return value


province_map = {
    "内蒙古自治区": "内蒙古",
    "新疆维吾尔自治区": "新疆",
    "广西壮族自治区": "广西",
    "宁夏回族自治区": "宁夏",
    "西藏自治区": "西藏",
    "北京市": "北京",
    "上海市": "上海",
    "天津市": "天津",
    "重庆市": "重庆"
}


def ymd(ymdhms_str: str):
    if is_void(ymdhms_str):
        return None
    datetime = date_func.str_ymdhms_to_datetime(ymdhms_str)
    return date_func.datetime_to_str_ymd(datetime)


def province_handle(address_province: str):
    if is_void(address_province):
        return None

    if address_province.endswith("省"):
        address_province = address_province.rstrip("省")

    if address_province in province_map:
        address_province = province_map[address_province]

    return address_province


def date_append(s: str):
    if is_void(s):
        return s

    return s + "-01-01"


def comma_to_tab(input: str):
    if is_void(input):
        return None

    li = [item.strip() for item in input.split("、")]
    return "\t".join(li)


def used_name_to_nested(s: str):
    if is_void(s):
        return None

    split = re.split("[;;\t]", s.strip())
    return [
        {
            "name": x
        } for x in split
    ]


def institution_list_to_nested(s: str):
    if is_void(s):
        return None

    split = re.split("、", s.strip())
    return [
        {
            "name": x
        } for x in split
    ]


def level_nested_to_nested(s: str):
    if is_void(s):
        return None

    split = re.split("[;;\t]", s.strip())
    return [
        {
            "level": x
        } for x in split
    ]


def other_name_to_nested(s: str):
    if is_void(s):
        return None

    li = []

    for item in s.split("、"):
        li.append({
            "name": item.strip()
        })

    return li


def medical_nested(s: str):
    if is_void(s):
        return None

    import re
    split = re.split(",", s.strip())

    li = []

    for x in split:
        d = {}

        for i, item in enumerate(iterable=x.split(">"), start=1):
            d[f"label_{i}"] = item

        li.append(d)

    return li


def strip(s: str):
    if is_void(s):
        return None

    return s.strip()


def point_to_minus(s: str):
    if is_void(s):
        return None

    return s.replace(".", "-")


def major_func(s: str):
    if is_void(s):
        return None
    else:
        return {
            "total_industrial_output_value_above_scale": str(int(s))
        }


def ismedical_1090_func(s: str):
    if is_void(s):
        return None
    if "是" == s:
        return "1"
    else:
        return "0"


def time_market_1090_func(s: str):
    if is_void(s):
        return None

    s = s.strip()
    if s == "--":
        return None

    return s


def loads(s: str):
    if is_void(s):
        return None

    return json.loads(s)


def dm_map_medical_institution_func(tp: tuple):
    _id = tp[0]
    d: dict = tp[1]

    address_province = d.get("address_province")
    if is_void(address_province):
        address_province = ""

    name = d.get("name")
    if is_void(name):
        name = ""

    d["medical_id"] = md5_str(name + address_province)

    return d["medical_id"], d


def dm_map_physical_organization_func(tp: tuple):
    _id = tp[0]
    d: dict = tp[1]

    d["org_id"] = md5_str(d["name"])

    return md5_str(d["org_id"]), d


def dm_map_social_organization_func(tp: tuple):
    _id = tp[0]
    d: dict = tp[1]

    org_name = d.get("org_name")

    if is_void(org_name):
        org_name = ""

    d["org_id"] = md5_str(org_name)

    return md5_str(d["org_id"]), d


def dm_map_major_tax_illegal_func(tp: tuple):
    _id = tp[0]
    d: dict = tp[1]

    case_id = d.get("case_id")
    if is_void(case_id):
        case_id = ""

    taxpayer_name = d.get("taxpayer_name")
    if is_void(taxpayer_name):
        taxpayer_name = ""

    d["id"] = md5_str(case_id + taxpayer_name)

    return md5_str(d["id"]), d


def dm_map_industrial_park(tp: tuple):
    _id = tp[0]
    d: dict = tp[1]

    name = d.get("name")
    if is_void(name):
        name = ""

    address_province = d.get("address_province")
    if is_void(address_province):
        address_province = ""

    d["id"] = md5_str(name + address_province)

    return md5_str(d["id"]), d


def dm_map_research_institution_func(tp: tuple):
    pk_li = [
        "address",
        "name",
        "person_liable",
        "type"
    ]

    d: dict = tp[1]

    pv_li = []

    for pk in pk_li:
        pv = d.get(pk)
        if pv:
            pv_li.append(pv)

    institution_id = md5_list(pv_li)
    d["institution_id"] = institution_id
    return md5_str(institution_id), d


def tab_str_replece(s: str):
    if is_void(s):
        return None
    return "\t".join([x.strip() for x in s.split("\\t")])


def park_func(tp: tuple):
    _id, d = tp
    return d["park_id"], d


def research_institution_func(tp: tuple):
    _id, d = tp
    return d["institution_id"], d


def cydn_project_func(tp: tuple):
    _id, d = tp
    # d["id"] = _id
    return _id, d


def dm_cydn_professional_platform_infos_func(tp: tuple):
    _id, d = tp
    _id = md5_str(d["map_id"] + d["source_doc_id"])
    d["id"] = _id
    return _id, d


def quiz_ask_func(s: str):
    if is_void(s):
        return None

    return [
        {
            "label": x.strip()
        }
        for x in re.split("[,.]", str(s))
    ]


def dm_map_flightinspection_info_func(tp: tuple):
    d: dict = tp[1]
    check_date = d.get("check_date")
    company_name = d.get("company_name")

    if is_void(check_date):
        check_date = ""

    if is_void(company_name):
        company_name = ""

    record_id = md5_str(check_date + company_name)
    d["record_id"] = record_id

    return md5_str(record_id), d


def label_level(v: str):
    __dict = {
        "一级": 0,
        "二级": 1,
        "三级": 2,
        "四级": 3,
        "五级": 4,
    }

    return __dict.get(v)


def dm_map_com_label_keyword_query_func(tp: tuple):
    _label_level = tp[1].get("label_level")

    if is_void(_label_level):
        return None
    return tp


def _tmp_1(tp:tuple):
    return None


def __tmp(tp: tuple):
    _id, d = tp

    # d['people_id'] = md5_str((d['people_name'] or "") + (d['position'] or ""))

    # new_d = {
    #
    #     k: v.replace("|","\t") if isinstance(v,str) else v
    #
    #     for k, v in d.items()
    #
    # }
    # _id = d['patent_id']
    _id = d['medical_id']

    return (_id,d)


def _filter_value(tp:tuple):
    _id, d = tp
    # new_d = {
    #     k:v
    #     for k ,v in d.items if v is not None
    # }
    _id = d['medical_id']
    return (_id,d)


if __name__ == '__main__':
    config = {
        "table": "dm_map_literature_1",
        "need_weight_json": False,
        "field_func": {
            # "major_economic_industry_indicators": major_func
            # "time_market_1090": time_market_1090_func
            # "detail_author_nested": loads,
            # "author": tab_str_replece,
            # "institute": tab_str_replece,
            # "person": loads
            # "other_name": other_name_to_nested,
            # "medical_label": loads,
            # "quiz_ask": quiz_ask_func,
            # "interventions": quiz_ask_func,
            # "complication": quiz_ask_func,
            # "symptom": quiz_ask_func,
            # "drug_name": quiz_ask_func
            # "author": tab_str_replece,
            # "institute": tab_str_replece,
            # "relation_infos": loads,
            # "influence_factors_nested": loads,
            # "influence_factors_nested": loads,
            # "main_supplier": loads,
            # "label_level": label_level
        },
        # Callable[[Tuple[_id, k-v_dict]], Tuple[_id, k-v_dict]]
        "final_func": None,
        "sheet": {
            "path": r"E:\公司\数据表\1688无年份.xlsx",
            "index": 0,
        },
        # "txt": {
        #     "path": f"{DAOCHU_FOLDER}/dm_map_social_organization.txt",
        #     "index": 0,
        # },
        # "pkey_li": [
        #     "sub_name",
        #     "name"
        # ]
    }

    conf = TransportConf(config)

    view = local_to_view(conf)

    SYS_dm_update_time = date_func.current_ymdhms()


    def add_time(tp):
        _id, d = tp
        d["SYS_dm_update_time"] = SYS_dm_update_time
        return _id, d

    view = view.map(add_time)

    # 如果传到两个es
    view.cache()


    # view.write_to_es(index=conf.table, node_enum=NodeEnum.LOCAL)
    # view.write_to_es(index=conf.table, node_enum=NodeEnum.NODE_172)
    # view.write_to_es(index=conf.table, node_enum=NodeEnum.TEST)


打赏作者
标签:

发表评论

邮箱地址不会被公开。 必填项已用*标注