public

paramiko

As of today (5/18/2019) the pip installed paramiko does not support rsa key files generated by openssh7.9 (ssh-keygen). I had to use puttygen to import the new format of id_rsa then export to older version of openssh private key. Otherwise error ‘paramiko.ssh_exception.SSHException: not a valid RSA private key file’

If a PEM key file start with “—–BEGIN OPENSSH PRIVATE KEY—–” then it is the new openssh key.

Also the follow code generated lots of warnings. I had to downgrade a package:

$ pip install cryptography==2.4.2
import paramiko
import os
remote_host='11.22.33.44'
key_filename=os.path.expanduser('~\.ssh\id_rsa.old')
client = paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.WarningPolicy)
client.connect(remote_host,username='root',key_filename=key_filename)
stdin,stdout,stderror = client.exec_command('hostname -f')
print ('--stdout----------------------')
print (stdout.read())
print ('--stderr----------------------')
print (stderror.read())
sftp = client.open_sftp()
sftp.put('a.csv','/tmp/a.csv')
stdin,stdout,stderror = client.exec_command('ls -lrt /tmp')
print (stdout.read())

When I tried to multi-thread the ssh to many hosts, I noticed there are many unexpected errors happened. I had to do following:

  1. let the main program to retrieve pkey from key_filename, then pass the pkey to connections. So that keyfile will not need to be openned by every threads
    # put following in main module, not in thread
    key_filename=os.path.expanduser(PRIVATE_KEYFILE)
    private_key = paramiko.RSAKey.from_private_key_file(key_filename)
    
  2. Disable look_for_keys, which look for files in ~/.ssh/. almost garrenteed to fail most of the threads. Note in the connect method I used pkey instead of key_filename list in previous examples
 ssh_client.connect(self._ipaddress,username=USER,pkey=private_key, look_for_keys=False,allow_agent=False,auth_timeout=5.0)
  1. make the declaration and reference of paramiko.SSHClient close to each other. Somehow the SSHClient expire quickly.

paramiko logging

paramiko generates logs of DEBUG information. I had to use following to mute most of them

logging.getLogger("paramiko").setLevel(logging.WARNING)

pyodbc

import pyodbc
'''
Use password to connect
'''
conn = pyodbc.connect('Driver={SQL Server Native Client 11.0};'
                    'Server=11.22.33.44;'
                    'Database=AdventureWorks2017;'
                    'UID=devuser;'
                    'PWD=fakepassword'
)
'''
Use Integrated login (Windows)
MARS_Connections = Yes would allow opening multiple cursors 
'''
conn = pyodbc.connect('Driver={SQL Server Native Client 11.0};'
                    'Server=fakeServerP;'
                    'Database=fakeDB;'
                    'MARS_Connection=Yes;'
                    'Trusted_Connection=yes;'
)

cursor = conn.cursor()
cursor.execute('select FirstName, LastName from Person.Person')
for row in cursor:
    print (row)

cx_Oracle

import cx_Oracle
connection = cx_Oracle.connect('devuser','fakepassword','11.22.33.44/xepdb1')

cursor = connection.cursor()
queryString = "select A from a"

cursor.execute(queryString)
for abc in cursor:
    print (abc)

tqdm

''' pbar.update(1) progresses 1 unit
'''
import time
from tqdm import tqdm

with tqdm(total=500) as pbar:
    for row in range (500):
        time.sleep(0.01)
        pbar.update(1)
        pass

Can 2 bars run concurrently? yes.

# position here is the line number after the command
with tqdm(total=347,position=0) as pbar0:
    with tqdm(total=347,position=1) as pbar1:
        for row in range (347):
            time.sleep(0.01)
            pbar0.update(1)
            pbar1.update(1)
        pass

Spark

Set environment

import findspark
import pyspark
from pyspark.sql import SparkSession
findspark.init()
spark = SparkSession.builder.getOrCreate()

df = spark.sql ("select 'spark' as hello")
df.show()

Read csv

df = spark.read.csv("a.csv",header=True, inferSchema=True)
df.show()
df.count()

Read from JDBC

empDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:oracle:thin:@//hostname.com:1521/service_name") \
    .option("dbtable", "TEST_TABLE") \
    .option("user", "username") \
    .option("password", "fakepassword") \
    .option("driver", "oracle.jdbc.driver.OracleDriver") \
    .load()
empDF.count()

Not tested but seemed good structure

from pyspark.sql import SparkSession

def init_spark():
  spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
  sc = spark.sparkContext
  return spark,sc

def main():
  spark,sc = init_spark()
  nums = sc.parallelize([1,2,3,4])
  print(nums.map(lambda x: x*x).collect())


if __name__ == '__main__':
  main()

Not tested SQL Context

from pyspark import SparkContext
from pyspark.sql import SQLContext

sc=SparkContext('local','example')
sql_sc = SQLContext(sc)
rdd = sc.textFile('edx.py')
rdd.collect()