v2.0
Steve Nyemba 9 months ago
parent 97a58b416a
commit 00f80d9294

@ -1,3 +1,4 @@
#!/usr/bin/env python
"""
(c) 2019 EDI Parser Toolkit,
Health Information Privacy Lab, Vanderbilt University Medical Center
@ -10,12 +11,12 @@ This code is intended to process and parse healthcare x12 837 (claims) and x12 8
The claims/outpout can be forwarded to a NoSQL Data store like couchdb and mongodb
Usage :
Commandline :
python x12parser <action>
# parse {x12}
healthcare-io parse <x12_folder>
# export {x12}
healthcare-io export
action:
- parser
- create.plugin
- register.plugin
-
Embedded :
@ -26,11 +27,12 @@ from typing import Optional
from typing_extensions import Annotated
import uuid
import os
import version
import meta
import json
import time
from healthcareio import x12
from healthcareio.x12.parser import X12Parser
import requests
# import healthcareio
# import healthcareio.x12.util
@ -39,11 +41,12 @@ app = typer.Typer()
CONFIG_FOLDER = os.sep.join([os.environ['HOME'],'.healthcareio'])
@app.command(name='init')
def config(email:str,provider:str='sqlite') :
"""
"""\b
Generate configuration file needed with default data store. For supported data-store providers visit https://hiplab.mc.vanderbilt.edu/git/hiplab/data-transport.git
:email your email
\r:provider data store provider (visit https://hiplab.mc.vanderbilt.edu/git/hiplab/data-transport.git)
:provider data store provider (visit https://hiplab.mc.vanderbilt.edu/git/hiplab/data-transport.git)
"""
_db = "healthcareio"
# _PATH = os.sep.join([os.environ['HOME'],'.healthcareio'])
@ -62,27 +65,55 @@ def config(email:str,provider:str='sqlite') :
"system":{
"uid":str(uuid.uuid4()),
"email":email,
"version":version.__version__,
"copyright":version.__author__
"version":meta.__version__,
"copyright":meta.__author__
}
}
#
# Let create or retrieve a user's key/token to make sure he/she has access to features they need
# This would also allow us to allow the users to be informed of new versions
#
try:
host = "https://healthcareio.the-phi.com" if 'PARSER_HOST_URL' not in os.environ else os.environ['PARSER_HOST']
url = f"{host}/api/users/signup"
_body = {"email":email,"version":meta.__version__}
_headers = {"content-type":"application/json"}
resp = requests.post(url,headers=_headers,data=json.dumps(_body))
if resp.ok :
_config['system'] = dict(_config['system'],**resp.json())
except Exception as e:
print (e)
pass
# store this on disk
f = open(os.sep.join([CONFIG_FOLDER,'config.json']),'w')
f.write(json.dumps(_config))
f.close()
_msg = f"""
Thank you for considering using our {{x12}} parser verion {meta.__version__}
The generatted configuration file found at {CONFIG_FOLDER}
The database provider is {provider}
visit {host} to learn more about the features,
"""
print (_msg)
@app.command(name='about')
def copyright():
for note in [version.__name__,version.__author__,version.__license__]:
for note in [meta.__name__,meta.__author__,meta.__license__]:
print (note)
pass
@app.command()
def parse (claim_folder:str,plugin_folder:str = None,config_path:str = None):
"""
This function will parse 837 and or 835 claims given a location of parsing given claim folder and/or plugin folder
This function will parse 837 and or 835 claims given a location of parsing given claim folder and/or plugin folder.
plugin_folder folder containing user defined plugins (default are loaded)
config_path default configuration path
"""
_plugins,_parents = x12.plugins.instance(path=plugin_folder)
@ -94,7 +125,7 @@ def parse (claim_folder:str,plugin_folder:str = None,config_path:str = None):
_config = json.loads(f.read())
f.close()
_store = _config['store']
# print (len(_files))
# # print (len(_files))
jobs = []
for _chunks in _files:
pthread = X12Parser(plugins=_plugins,parents=_parents,files=_chunks, store=_store)
@ -103,11 +134,29 @@ def parse (claim_folder:str,plugin_folder:str = None,config_path:str = None):
while jobs :
jobs = [pthread for pthread in jobs if pthread.is_alive()]
time.sleep(1)
pass
else:
pass
# pass
# else:
# pass
print ("...................... FINISHED .........................")
#
#
@app.command(name="export")
def publish (file_type:str,path:str):
"""
This function will export to a different database
file_type values are either claims or remits
path path to export configuration (data transport file)
file_type claims or remits
"""
if file_type in ['837','claims'] :
_type = 'claims'
elif file_type in ['835','remits']:
_type = 'remits'
if __name__ == '__main__' :
app()

@ -7,6 +7,7 @@ import os
import json
# from healthcareio.x12.util
from healthcareio import x12
from healthcareio.x12.util import file, document
import numpy as np
import transport
import copy
@ -14,15 +15,17 @@ import copy
from datetime import datetime
from healthcareio.logger import X12Logger
import time
import pandas as pd
class BasicParser (Process) :
def __init__(self,**_args):
super().__init__()
self._plugins = _args['plugins']
self._parents = _args['parents']
self._files = _args['files']
self._store = _args['store']
self._store = dict(_args['store'],**{'lock':True})
self._template = x12.util.template(plugins=self._plugins)
# self._logger = _args['logger'] if 'logger' in _args else None
self._logger = X12Logger(store = self._store)
if self._logger :
_info = { key:len(self._plugins[key].keys())for key in self._plugins}
@ -34,6 +37,8 @@ class BasicParser (Process) :
This function logs data into a specified location in JSON format
datetime,module,action,data
"""
if self._logger :
self._logger.log(**_args)
pass
def apply(self,**_args):
"""
@ -44,15 +49,17 @@ class BasicParser (Process) :
_content = _args['content']
_filetype = _args['x12']
_doc = _args['document'] #{}
_documentHandler = x12.util.document.Builder(plugins = self._plugins,parents=self._parents)
try:
_documentHandler = x12.util.document.Builder(plugins = self._plugins,parents=self._parents, logger=self._logger)
try:
_tmp = {}
for _row in _content :
# _data = None
_data,_meta = _documentHandler.bind(row=_row,x12=_filetype)
if _data and _meta :
_doc = _documentHandler.build(data=_data,document=_doc,meta=_meta,row=_row)
# print (['*** ',_doc])
pass
@ -61,15 +68,18 @@ class BasicParser (Process) :
except Exception as e:
#
# Log something here ....
print (_row)
# print (_row)
print (e)
# print (_row,_doc.keys())
pass
return _doc
def run(self):
_handleContent = x12.util.file.Content()
_handleDocument = x12.util.document.Builder(plugins = self._plugins,parents=self._parents)
_handleContent = file.Content() #x12.util.file.Content()
_handleDocument = document.Builder(plugins = self._plugins,parents=self._parents,logger=self._logger)
_template = self._template #x12.util.template(plugins=self._plugins)
#
# @TODO: starting initializing parsing jobs :
# - number of files, plugins meta data
@ -89,10 +99,14 @@ class BasicParser (Process) :
_header = copy.deepcopy(_template[_filetype])
_header = self.apply(content=_content[0],x12=_filetype, document=_header)
_docs = []
_ids = []
for _rawclaim in _content[1:] :
_document = copy.deepcopy(_header) #copy.deepcopy(_template[_filetype])
if 'claim_id' in _document :
#
# @TODO: Have a way to get the attribute for CLP or CLM
_ids.append(_document['claim_id'])
# _document = dict(_document,**_header)
if type(_absolute_path) == str:
_document['filename'] = _absolute_path
@ -108,7 +122,9 @@ class BasicParser (Process) :
_data = {'filename':_location, 'available':len(_content[1:]),'x12':_filetype}
_args = {'module':'parse','action':'parse','data':_data}
_data['parsed'] = len(_docs)
self._logger.log(**_args)
self.log(**_args)
self.log(module='parse',action='file-count', data={'file_name':_absolute_path,'file_type':_filetype,'claims':_ids, 'claim_count':len(_ids)})
#
# Let us submit the batch we have thus far
#
@ -135,78 +151,17 @@ class X12Parser(BasicParser):
_documents = _args['documents']
if _documents :
_store = copy.copy(self._store,**{})
_store = copy.deepcopy(self._store)
TABLE = 'claims' if _args['x12'] in ['837','claims'] else 'remits'
_store['table'] = TABLE
_store['cotnext'] = 'write'
_writer = transport.factory.instance(**_store)
_writer.write(_documents)
_writer.write(_documents,table=TABLE)
if getattr(_writer,'close') :
_writer.close()
#
# LOG: report what was written
_data = {'x12':_args['x12'], 'documents':len(_documents),'filename':_args['filename']}
self._logger.log(module='write',action='write',data=_data)
# def instance (**_args):
# """
# :path
# """
# # _files = x12.util.Files.get(_args['file'])
# # #
# # # We can split these files (multi-processing)
# # #
# # _jobCount = 1 if 'jobs' not in _args else int (_args['jobs'])
# # _files = np.array_split(_files,_jobCount)
# # PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
# # if 'config' in _args :
# # PATH = _args['config']
# # f = open(PATH)
# # _config = json.loads(f.read())
# # f.close()
# # jobs = []
# # for _batch in _files :
# # pthread = Parser(files=_batch,config=_config)
# # pthread.start()
# # jobs.append(pthread)
# # time.sleep(1)
# pass
# class parser (Process) :
# _CONFIGURATION = {}
# def __init__(self,path=None) :
# if not parser._CONFIGURATION :
# _path = path if path else os.sep.join([os.environ['HOME'],'.healthcareio/config.json'])
# #
# # @TODO: Load custom configuration just in case we need to do further processing
# config = json.loads(open(path).read())
# parser._CONFIGURATION = config['parser']
# #
# # do we have a custom configuration in this location
# #
# _custompath = _path.replace('config.json','')
# _custompath = _custompath if not _custompath.endswith(os.sep) else _custompath[:-1]
# _custompath = os.sep.join([_custompath,'custom'])
# if os.exists(_custompath) :
# files = os.listdir(_custompath)
# if files :
# _filename = os.sep.join([_custompath,files[0]])
# _customconf = json.loads(open(_filename).read())
# #
# # merge with existing configuration
# else:
# pass
# #
# #
# class getter :
# def value(self,) :
# pass
# class setter :
# def files(self,files):
# pass
# self._logger.log(module='write',action='write',data=_data)
self.log(module='parse',action='write',data=_data)

@ -1,7 +1,7 @@
import numpy as np
from .. import parser
from datetime import datetime
@parser(element='NM1',x12='*', anchor={'41':'submitter','40':'receiver','82':'rendering_provider','85':'billing_provider','87':'pay_to_provider','IL':'patient','PR':'payer','QC':'patient','DN':'referring_provider','77':'provider','2':'billing_provider'}, map={1:'type',3:'name',-1:'id'})
@parser(element='NM1',x12='*', anchor={'41':'submitter','40':'receiver','82':'rendering_provider','85':'billing_provider','87':'pay_to_provider','IL':'patient','PR':'payer','QC':'patient','DN':'referring_provider','77':'provider','2':'billing_provider'}, map={1:'type',3:'name_1',4:'name_2',-1:'id'})
def NM1 (**_args):
"""
Expected Element NM1

@ -57,9 +57,9 @@ def SVC (**_args):
_data['paid_amount'] = np.float64(_data['paid_amount'])
return _data
pass
@parser(element='N1',x12='835',anchor={'PR':'provider'},map={1:'name'})
def N1(**_args):
pass
# @parser(element='N1',x12='835',anchor={'PR':'provider'},map={1:'name'})
# def N1(**_args):
# pass
@parser(element='N3',x12='835',parent='N1',map={1:'address_line_1'})
def N3(**_args):
pass

@ -15,7 +15,7 @@ class Builder:
self._plugins = copy.deepcopy(_args['plugins'])
self._parents = copy.deepcopy(_args['parents'])
self._loop = {}
self._logger = None if 'logger' not in _args else _args['logger']
def reset (self):
self._last = {}
@ -33,33 +33,6 @@ class Builder:
return self._last[_id] if _id in self._last else None
return None
# if _id in self._parents :
# self._last[_id] =
# if 'parent' in _meta : #hasattr(_meta,'parent'):
# _hasField = 'field' in _meta
# _hasParent= _meta['element'] in self._parents
# if _hasField and _hasParent: #_meta.element in self._parents and hasattr(_meta,'field'):
# self._last = _item
# pass
# else:
# for key in self._parents :
# if _meta['element'] in self._parents[key] :
# _ikey = list(self._last.keys())[0]
# _oldinfo = self._last[_ikey]
# if type(_oldinfo) != dict :
# #
# # Only applicable against a dictionary not a list (sorry)
# pass
# else:
# _item = {_ikey: self.merge(_oldinfo,_item)}
# break
# pass
# return _item
def count(self,_element):
if _element not in self._loop :
self._loop[_element] = 0
@ -251,10 +224,12 @@ class Builder:
if _field :
if 'container' in _meta and type(_document[_field]) != list :
_document[_field] = []
if _field and _document :
if _field not in _document :
_document[_field] =_data
pass
else:
if 'container' in _meta :
_document[_field].append(_data)
@ -263,9 +238,23 @@ class Builder:
else:
if not _field and 'anchor' in _meta :
#
# This is an unusual situation ...
# We should determine if the element is either a parent or has a parent
# This would allow us to avoid having runaway attributes and undermine structural integrity
#
#
# The element has NOT been specified by the plugin (alas)
# For this case we would advise writing a user-defined plugin to handle this case
#
print (self._logger)
if self._logger :
print (['....................'])
self._logger.log(action='missing-plugin',module='build',data={'element':_row[0],'anchor':_row[1]})
return _document
pass
# print ([_row[0],set(_data) - set(_document.keys())])
_document = self.merge(_document,_data)
return _document

@ -14,13 +14,13 @@ args = {
"author":meta.__author__,
"author_email":"steve.l.nyemba@vumc.org",
"include_package_data":True,
"license":version.__license__,
"license":meta.__license__,
"packages":find_packages(),
"keywords":["healthcare","edi","x12","analytics","835","837","data","transport","protocol"]
}
args["install_requires"] = ['typer','flask-socketio','seaborn','jinja2','jsonmerge', 'weasyprint','data-transport@git+https://healthcareio.the-phi.com/git/code/transport.git','pymongo','numpy','cloudant','pika','boto','botocore','flask-session','smart_open','smart-top@git+https://healthcareio.the-phi.com/git/code/smart-top.git@data-collector']
args["install_requires"] = ['typer','flask-socketio','seaborn','jinja2','jsonmerge', 'weasyprint','data-transport@git+https://healthcareio.the-phi.com/git/code/transport.git','pymongo','numpy','cloudant','pika','boto','botocore','flask-session','smart_open']
args['url'] = 'https://hiplab.mc.vanderbilt.edu'
args['scripts']= ['healthcareio/healthcare-io.py']
args['scripts']= ['bin/healthcare-io']
# args['entry_points'] = {
# 'console_scripts' : ['healthcareio=healthcareio:register']
# }

Loading…
Cancel
Save