1。NacosAPI 本代码包括列出service列表,列出指定服务下实例列表,服务上下线,实例注册,服务信息修改等方法,更多关于nacosapi的详细信息,请参考https:nacos。iozhcndocsopenapi。html,importrequestsclassNacosAPI(object):definit(self,url,username,password):self。auth,self。url{username:username,password:password},urlself。responserequests。post(snacosv1authuserslogin(self。url),dataself。auth,verifyFalse)。json()self。params((accessToken,self。response〔accessToken〕),)defservicesList(self):列出service列表datarequests。get(snacosv1nscatalogservices?pageNo1pageSize100self。url,paramsself。params)。textprint(data)definstanceList(self,service):列出指定服务下实例列表datarequests。get(snacosv1nsinstancelist?serviceNames(self。url,service),paramsself。params)。textprint(data)defchangeStatus(self,serviceName,ip,port,enabled):实例上下线enabled:true上线,false下线self。data{serviceName:serviceName,ip:ip,port:port,enabled:enabled}datarequests。put(snacosv1nsinstance(self。url),paramsself。params,dataself。data,verifyFalse)。textprint(data)defregister(self,serviceName,ip,port):实例注册urlsnacosv1nsinstance(self。url)datarequests。post(url,data{ip:ip,port:port,serviceName:serviceName})print(data。text)definstanceChange(self):修改实例信息data{serviceName:merchantorder,ip:10。42。4。129,port:8080,clusterName:DEFAULT,weight:1,enabled:False}responserequests。put(urlsnacosv1nsinstance(self。url),datadata)print(response。text)ifnamemain:nacosNacosAPI(http:192。168。3。160:8848,nacos,NACOS)nacos。register() 2。Apollo前台界面API 因开放平台未支持授权,故抓取了前台界面的api实现批量授权功能,开放平台的管理需要提前授权才能管理importrequestsclassapolloweb():definit(self,potalurl,username,password):self。potalurl,self。username,self。passwordpotalurl,username,passworddata{username:username,password:password}responserequests。post(f{self。potalurl}signin,datadata,allowredirectsFalse)self。SESSIONdict(response。cookies)〔SESSION〕defAuthorizeApp(self,Token,app:str):授权应用给用户cookies{NGTRANSLATELANGKEY:zhCN,SESSION:self。SESSION}params((envs,〔DEV,SIT〕),(type,AppRole))headers{ContentType:applicationjson}data{appId:s}appresponserequests。post(f{self。potalurl}consumers{Token}assignrole,paramsparams,headersheaders,cookiescookies,datadata,verifyFalse)print(response。text)ifnamemain:apolloapolloweb()apollo。AuthorizeApp(Token,sms) 3。Apollo开放平台API 本代码包含获取配置,修改配置,发布配置,批量发布配置等功能,在我的日常工作中,批量发布用的很频繁,importrequests,jsonfromloguruimportloggerclassApolloAPI(object):definit(self,apolloenv):apolloinfo{aliyun:{url:http:apollo。ali。cn,token:a2814e7476b64fbdb08d9e9ec0b2d2d76e621cac},local:{url:http:apollo。tenx。cn,token:a2814e7476b64fbdb08d9e9ec0b2d2d76e621cac}}self。apollourlapolloinfo〔apolloenv〕。get(url)self。apps〔zuul,omc,oms,omss〕self。headers{Authorization:apolloinfo〔apolloenv〕。get(token),ContentType:charsetUTF8}defGetnamespaceName(self,env,appid,clusterNamedefault):获取某集群下,指定appid的所有namespaceurlsopenapiv1envssappssclusterssnamespaces(self。apollourl,env,appid,clusterName)datarequests。get(url,headersself。headers)。json()result〔x〔namespaceName〕forxindata〕returnresultdefAddAppConfig(self,env,appId,namespaceName,key,value,comment,clusterNamedefault):添加配置urlsopenapiv1envssappssclusterssnamespacessitems(self。apollourl,env,appId,clusterName,namespaceName)data{key:key,value:value,comment:comment,dataChangeCreatedBy:apollo}resposerequests。post(url,jsondata,headersself。headers)print(新增配置:环境:s,appId:s,namespaceName:s,key:s,value:s,新增配置结果:s(env,appId,namespaceName,key,value,respose。text))defReleasesAppOneNamespaceName(self,env,appId,namespaceName,clusterNamedefault):发布指定appid下指定的namespaceurlsopenapiv1envssappssclusterssnamespacessreleases(self。apollourl,env,appId,clusterName,namespaceName)data{releaseTitle:releaseTitle,releaseComment:releaseComment,releasedBy:apollo}resposerequests。post(url,jsondata,headersself。headers)logger。info(ReleasesAppOneNamespaceName:respose:s(respose。text))ifrespose。statuscode200:return发布成功else:return发布失败,HTTP状态码:srespose。statuscodedefReleasesAppAllNamespaceName(self,env,appId,clusterNamedefault):发布指定appid下所有namespaceresult〔〕namespaceNamesself。GetnamespaceName(env,appId,clusterName)fornamespaceNameinnamespaceNames:logger。info(ReleasesAppAllNamespaceName:app:s,namespaceName:s(appId,namespaceName))result。append(self。ReleasesAppOneNamespaceName(env,appId,namespaceName,clusterName))returnresultdefReleasesAllApps(self,envFINDEV):发布指定环境的所有应用forappinself。apps:try:resultself。ReleasesAppAllNamespaceName(env,app)logger。info(app:s,s(app,result))exceptExceptionase:logger。info(ReleasesAllApps:s,error:s(app,e))defBeforeReleasesDetailAll(self,env,appId,clusterNamedefault):获取即将发布后的配置信息result{}urlsopenapiv1envssappssclusterssnamespaces(self。apollourl,env,appId,clusterName)try:resposerequests。get(url,headersself。headers)。json()forxinrespose:result〔x〔namespaceName〕〕{}foriinx〔items〕:result〔x〔namespaceName〕〕〔i〔key〕〕i〔value〕returnresultexceptExceptionase:print(BeforeReleasesDetailAll:s:s(env,appId))raiseException(sdsdsdsd)defCurrentReleasesedDetailAll(self,env,appId,clusterNamedefault):获取发布前的配置信息result{}namespaceNamesself。GetnamespaceName(env,appId,clusterName)fornamespaceNameinnamespaceNames:urlsopenapiv1envssappssclusterssnamespacessreleaseslatest(self。apollourl,env,appId,clusterName,namespaceName)resposerequests。get(url,headersself。headers)ifrespose。text:resposejsonrespose。json()result〔resposejson〔namespaceName〕〕resposejson〔configurations〕returnresultifnamemain:apolloApolloAPI(ali)apollo。ReleasesAllApps(PRE) 4。K8SAPI 本代码支持文件和文本认证信息,方便根据实际情况开发fromkubernetesimportclient,configfromkubernetes。client。restimportApiExceptionclassKubernetesAPI:definit(self,kubeconf):文件config。loadkubeconfig(kubeconf)definit(self,kubeconftext):文本yamldatayaml。load(kubeconftext,Loaderyaml。Loader)k8loaderkubeconfig。KubeConfigLoader(yamldata)callconfigtype。call(Configuration)k8loader。loadandset(callconfig)Configuration。setdefault(callconfig)deflistnode(self):列出nodev1client。CoreV1Api()try:retv1。listnode(watchFalse)result{code:200,data::〔{uid:x。metadata。uid,name:x。metadata。name,creattime:x。metadata。creationtimestamp,podcidrs:x。spec。podcidrs,podcidr:x。spec。podcidr,unschedulable:x。spec。unschedulable}forxinret。items〕}exceptApiExceptionase:result{code:500,msg:e}returnresultdeflistnamespace(self):列出namespacev1client。CoreV1Api()try:retv1。listnamespace(watchFalse)result{code:200,data::〔{uid:x。metadata。uid,name:x。metadata。name,creattime:x。metadata。creationtimestamp}forxinret。items〕}exceptApiExceptionase:result{code:500,msg:e}returnresultdeflistdeploymet(self,namespaces):列出指定namespaces的deploymetv1client。AppsV1Api()try:retv1。listnamespaceddeployment(namespaces,watchFalse)result{code:200,data:〔{uid:x。metadata。uid,name:x。metadata。name,namespace:x。metadata。namespace,replicas:x。spec。replicas,status:x。status。availablereplicas,creattime:x。metadata。creationtimestamp}forxinret。items〕}exceptApiExceptionase:result{code:500,msg:e}returnresultdeflistpods(self,namespace):v1client。CoreV1Api()try:retv1。listnamespacedpod(namespace)result{code:200,data:〔{uid:x。metadata。uid,name:x。metadata。name,podip:x。status。podip,hostip:x。status。hostip,creattime:x。metadata。creationtimestamp}forxinret。items〕}exceptApiExceptionase:result{code:500,msg:e}returnresultdeflistpodsdeployment(self,namespace,deployment):查询deployment的podsv1client。CoreV1Api()try:retv1。listnamespacedpod(namespace,labelselectorapp。kubernetes。ionamesdeployment)result{code:200,data:〔{uid:x。metadata。uid,name:x。metadata。name,podip:x。status。podip,hostip:x。status。hostip,creattime:x。metadata。creationtimestamp}forxinret。items〕}exceptApiExceptionase:result{code:500,msg:e}returnresultdefquerypodstdoutlogs(self,namespaces,pod):查看podstdout日志v1client。CoreV1Api()try:result{code:200,data:v1。readnamespacedpodlog(pod,namespaces)}exceptApiExceptionase:result{code:500,msg:e}returnresultdefquerydeploymentstdoutlogs(self,namespace,deployment):查看deployment下pod的日志result〔〕querypodsself。listpodsdeployment(namespace,deployment)ifquerypods〔code〕200:forxinquerypods〔data〕:result。append({name:x〔name〕,logs:self。querypodstdoutlogs(namespace,x〔name〕)})else:result{code:500,msg:查询pod出错:squerypods〔msg〕}returnresultdefremovepod(self,namespace,podname):删除podv1client。CoreV1Api()try:v1。deletenamespacedpod(podname,namespace)return{code:200,msg:删除成功}exceptApiExceptionase:return{code:500,msg:str(e)}defdeploymentsetvar(self,namespace,deployment,key,value):设置deployment变量v1client。AppsV1Api()body{spec:{template:{spec:{containers:〔{env:〔{name:key,value:value}〕,name:deployment}〕}}}}try:v1。patchnamespaceddeployment(namedeployment,namespacenamespace,bodybody)return{code:200,msg:设置成功}exceptApiExceptionase:return{code:500,msg:str(e)}ifnamemain:ssKubernetesAPI(D:Softwareconfig)sssss。querydeploymentstdoutlogs(dev,qtz)print(sss)