Fork from TTPSC: SAE J1939 for Python
=====================================
|docs|
.. |docs| image:: https://readthedocs.org/projects/j1939/badge/?version=latest
:target: https://j1939.readthedocs.io/en/stable/
:alt: Documentation build Status
Overview
--------
This is a fork of the https://github.com/juergenH87/python-can-j1939 project, it essentially adds the possibility to get the origin of the message.
For more information please check the readme file in the author's project.
Installation
------------
Install can-j1939 with pip::
$ pip install can-j1939-ttpsc
or do the trick with::
$ git clone https://github.com/ttpscmolinaf/python-can-j1939
$ cd j1939
$ pip install .
Upgrade
------------
Upgrade an already installed can-j1939 package::
$ pip install --upgrade can-j1939
Quick start
-----------
To simply receive all passing (public) messages on the bus you can subscribe to the ECU object.
.. code-block:: python
import logging
import time
import can
import j1939
logging.getLogger('j1939').setLevel(logging.DEBUG)
logging.getLogger('can').setLevel(logging.DEBUG)
def on_message(priority, pgn, sa, timestamp, data):
"""Receive incoming messages from the bus
:param int priority:
Priority of the message
:param int pgn:
Parameter Group Number of the message
:param int sa:
Source Address of the message
:param int timestamp:
Timestamp of the message
:param bytearray data:
Data of the PDU
"""
print("PGN {} length {}".format(pgn, len(data)))
def main():
print("Initializing")
# create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)
ecu = j1939.ElectronicControlUnit()
# Connect to the CAN bus
# Arguments are passed to python-can's can.interface.Bus() constructor
# (see https://python-can.readthedocs.io/en/stable/bus.html).
# ecu.connect(bustype='socketcan', channel='can0')
# ecu.connect(bustype='kvaser', channel=0, bitrate=250000)
ecu.connect(bustype='pcan', channel='PCAN_USBBUS1', bitrate=250000)
# ecu.connect(bustype='ixxat', channel=0, bitrate=250000)
# ecu.connect(bustype='vector', app_name='CANalyzer', channel=0, bitrate=250000)
# ecu.connect(bustype='nican', channel='CAN0', bitrate=250000)
# subscribe to all (global) messages on the bus
ecu.subscribe(on_message)
time.sleep(120)
print("Deinitializing")
ecu.disconnect()
if __name__ == '__main__':
main()
A more sophisticated example in which the CA class was overloaded to include its own functionality:
.. code-block:: python
import logging
import time
import can
import j1939
logging.getLogger('j1939').setLevel(logging.DEBUG)
logging.getLogger('can').setLevel(logging.DEBUG)
# compose the name descriptor for the new ca
name = j1939.Name(
arbitrary_address_capable=0,
industry_group=j1939.Name.IndustryGroup.Industrial,
vehicle_system_instance=1,
vehicle_system=1,
function=1,
function_instance=1,
ecu_instance=1,
manufacturer_code=666,
identity_number=1234567
)
# create the ControllerApplications
ca = j1939.ControllerApplication(name, 128)
def ca_receive(priority, pgn, source, timestamp, data):
"""Feed incoming message to this CA.
(OVERLOADED function)
:param int priority:
Priority of the message
:param int pgn:
Parameter Group Number of the message
:param intsa:
Source Address of the message
:param int timestamp:
Timestamp of the message
:param bytearray data:
Data of the PDU
"""
print("PGN {} length {}".format(pgn, len(data)))
def ca_timer_callback1(cookie):
"""Callback for sending messages
This callback is registered at the ECU timer event mechanism to be
executed every 500ms.
:param cookie:
A cookie registered at 'add_timer'. May be None.
"""
# wait until we have our device_address
if ca.state != j1939.ControllerApplication.State.NORMAL:
# returning true keeps the timer event active
return True
# create data with 8 bytes
data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * 8
# sending normal broadcast message
ca.send_pgn(0, 0xFD, 0xED, 6, data)
# sending normal peer-to-peer message, destintion address is 0x04
ca.send_pgn(0, 0xE0, 0x04, 6, data)
# returning true keeps the timer event active
return True
def ca_timer_callback2(cookie):
"""Callback for sending messages
This callback is registered at the ECU timer event mechanism to be
executed every 500ms.
:param cookie:
A cookie registered at 'add_timer'. May be None.
"""
# wait until we have our device_address
if ca.state != j1939.ControllerApplication.State.NORMAL:
# returning true keeps the timer event active
return True
# create data with 100 bytes
data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * 100
# sending multipacket message with TP-BAM
ca.send_pgn(0, 0xFE, 0xF6, 6, data)
# sending multipacket message with TP-CMDT, destination address is 0x05
ca.send_pgn(0, 0xD0, 0x05, 6, data)
# returning true keeps the timer event active
return True
def main():
print("Initializing")
# create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)
ecu = j1939.ElectronicControlUnit()
# Connect to the CAN bus
# Arguments are passed to python-can's can.interface.Bus() constructor
# (see https://python-can.readthedocs.io/en/stable/bus.html).
# ecu.connect(bustype='socketcan', channel='can0')
# ecu.connect(bustype='kvaser', channel=0, bitrate=250000)
ecu.connect(bustype='pcan', channel='PCAN_USBBUS1', bitrate=250000)
# ecu.connect(bustype='ixxat', channel=0, bitrate=250000)
# ecu.connect(bustype='vector', app_name='CANalyzer', channel=0, bitrate=250000)
# ecu.connect(bustype='nican', channel='CAN0', bitrate=250000)
# ecu.connect('testchannel_1', bustype='virtual')
# add CA to the ECU
ecu.add_ca(controller_application=ca)
ca.subscribe(ca_receive)
# callback every 0.5s
ca.add_timer(0.500, ca_timer_callback1)
# callback every 5s
ca.add_timer(5, ca_timer_callback2)
# by starting the CA it starts the address claiming procedure on the bus
ca.start()
time.sleep(120)
print("Deinitializing")
ca.stop()
ecu.disconnect()
if __name__ == '__main__':
main()
Credits
-------
This implementation was taken from https://github.com/juergenH87/python-can-j1939, as we needed to get information of the bus for the message that was received by the library.
Thanks for your great work Juergen :)!
.. _python-can: https://python-can.readthedocs.org/en/stable/
.. _Copperhill technologies: http://copperhilltech.com/a-brief-introduction-to-the-sae-j1939-protocol/
Raw data
{
"_id": null,
"home_page": "https://github.com/ttpscmolinaf/python-can-j1939",
"name": "can-j1939-ttpsc",
"maintainer": null,
"docs_url": null,
"requires_python": null,
"maintainer_email": null,
"keywords": "CAN SAE J1939 J1939-FD J1939-22",
"author": "TTPSC",
"author_email": null,
"download_url": "https://files.pythonhosted.org/packages/d3/94/7e58869bf455eea425121139ae1488997ed11da26b849a41fa759b0d81da/can-j1939-ttpsc-1.0.4.tar.gz",
"platform": "any",
"description": "Fork from TTPSC: SAE J1939 for Python\n=====================================\n\n|docs|\n\n.. |docs| image:: https://readthedocs.org/projects/j1939/badge/?version=latest\n :target: https://j1939.readthedocs.io/en/stable/\n :alt: Documentation build Status\n\nOverview\n--------\n\nThis is a fork of the https://github.com/juergenH87/python-can-j1939 project, it essentially adds the possibility to get the origin of the message.\nFor more information please check the readme file in the author's project.\n\nInstallation\n------------\n\nInstall can-j1939 with pip::\n\n $ pip install can-j1939-ttpsc\n\nor do the trick with::\n\n $ git clone https://github.com/ttpscmolinaf/python-can-j1939\n $ cd j1939\n $ pip install .\n\nUpgrade\n------------\n\nUpgrade an already installed can-j1939 package::\n\n $ pip install --upgrade can-j1939\n\n\nQuick start\n-----------\n\nTo simply receive all passing (public) messages on the bus you can subscribe to the ECU object.\n\n.. code-block:: python\n\n import logging\n import time\n import can\n import j1939\n\n logging.getLogger('j1939').setLevel(logging.DEBUG)\n logging.getLogger('can').setLevel(logging.DEBUG)\n\n def on_message(priority, pgn, sa, timestamp, data):\n \"\"\"Receive incoming messages from the bus\n\n :param int priority:\n Priority of the message\n :param int pgn:\n Parameter Group Number of the message\n :param int sa:\n Source Address of the message\n :param int timestamp:\n Timestamp of the message\n :param bytearray data:\n Data of the PDU\n \"\"\"\n print(\"PGN {} length {}\".format(pgn, len(data)))\n\n def main():\n print(\"Initializing\")\n\n # create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)\n ecu = j1939.ElectronicControlUnit()\n\n # Connect to the CAN bus\n # Arguments are passed to python-can's can.interface.Bus() constructor\n # (see https://python-can.readthedocs.io/en/stable/bus.html).\n # ecu.connect(bustype='socketcan', channel='can0')\n # ecu.connect(bustype='kvaser', channel=0, bitrate=250000)\n ecu.connect(bustype='pcan', channel='PCAN_USBBUS1', bitrate=250000)\n # ecu.connect(bustype='ixxat', channel=0, bitrate=250000)\n # ecu.connect(bustype='vector', app_name='CANalyzer', channel=0, bitrate=250000)\n # ecu.connect(bustype='nican', channel='CAN0', bitrate=250000)\n\n # subscribe to all (global) messages on the bus\n ecu.subscribe(on_message)\n\n time.sleep(120)\n\n print(\"Deinitializing\")\n ecu.disconnect()\n\n if __name__ == '__main__':\n main()\n\nA more sophisticated example in which the CA class was overloaded to include its own functionality:\n\n.. code-block:: python\n\n import logging\n import time\n import can\n import j1939\n\n logging.getLogger('j1939').setLevel(logging.DEBUG)\n logging.getLogger('can').setLevel(logging.DEBUG)\n\n # compose the name descriptor for the new ca\n name = j1939.Name(\n arbitrary_address_capable=0,\n industry_group=j1939.Name.IndustryGroup.Industrial,\n vehicle_system_instance=1,\n vehicle_system=1,\n function=1,\n function_instance=1,\n ecu_instance=1,\n manufacturer_code=666,\n identity_number=1234567\n )\n\n # create the ControllerApplications\n ca = j1939.ControllerApplication(name, 128)\n\n\n def ca_receive(priority, pgn, source, timestamp, data):\n \"\"\"Feed incoming message to this CA.\n (OVERLOADED function)\n :param int priority:\n Priority of the message\n :param int pgn:\n Parameter Group Number of the message\n :param intsa:\n Source Address of the message\n :param int timestamp:\n Timestamp of the message\n :param bytearray data:\n Data of the PDU\n \"\"\"\n print(\"PGN {} length {}\".format(pgn, len(data)))\n\n def ca_timer_callback1(cookie):\n \"\"\"Callback for sending messages\n\n This callback is registered at the ECU timer event mechanism to be\n executed every 500ms.\n\n :param cookie:\n A cookie registered at 'add_timer'. May be None.\n \"\"\"\n # wait until we have our device_address\n if ca.state != j1939.ControllerApplication.State.NORMAL:\n # returning true keeps the timer event active\n return True\n\n # create data with 8 bytes\n data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * 8\n\n # sending normal broadcast message\n ca.send_pgn(0, 0xFD, 0xED, 6, data)\n\n # sending normal peer-to-peer message, destintion address is 0x04\n ca.send_pgn(0, 0xE0, 0x04, 6, data)\n\n # returning true keeps the timer event active\n return True\n\n\n def ca_timer_callback2(cookie):\n \"\"\"Callback for sending messages\n\n This callback is registered at the ECU timer event mechanism to be\n executed every 500ms.\n\n :param cookie:\n A cookie registered at 'add_timer'. May be None.\n \"\"\"\n # wait until we have our device_address\n if ca.state != j1939.ControllerApplication.State.NORMAL:\n # returning true keeps the timer event active\n return True\n\n # create data with 100 bytes\n data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * 100\n\n # sending multipacket message with TP-BAM\n ca.send_pgn(0, 0xFE, 0xF6, 6, data)\n\n # sending multipacket message with TP-CMDT, destination address is 0x05\n ca.send_pgn(0, 0xD0, 0x05, 6, data)\n\n # returning true keeps the timer event active\n return True\n\n def main():\n print(\"Initializing\")\n\n # create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)\n ecu = j1939.ElectronicControlUnit()\n\n # Connect to the CAN bus\n # Arguments are passed to python-can's can.interface.Bus() constructor\n # (see https://python-can.readthedocs.io/en/stable/bus.html).\n # ecu.connect(bustype='socketcan', channel='can0')\n # ecu.connect(bustype='kvaser', channel=0, bitrate=250000)\n ecu.connect(bustype='pcan', channel='PCAN_USBBUS1', bitrate=250000)\n # ecu.connect(bustype='ixxat', channel=0, bitrate=250000)\n # ecu.connect(bustype='vector', app_name='CANalyzer', channel=0, bitrate=250000)\n # ecu.connect(bustype='nican', channel='CAN0', bitrate=250000)\n # ecu.connect('testchannel_1', bustype='virtual')\n\n # add CA to the ECU\n ecu.add_ca(controller_application=ca)\n ca.subscribe(ca_receive)\n # callback every 0.5s\n ca.add_timer(0.500, ca_timer_callback1)\n # callback every 5s\n ca.add_timer(5, ca_timer_callback2)\n # by starting the CA it starts the address claiming procedure on the bus\n ca.start()\n\n time.sleep(120)\n\n print(\"Deinitializing\")\n ca.stop()\n ecu.disconnect()\n\n if __name__ == '__main__':\n main()\n\n\nCredits\n-------\nThis implementation was taken from https://github.com/juergenH87/python-can-j1939, as we needed to get information of the bus for the message that was received by the library.\n\nThanks for your great work Juergen :)!\n\n\n\n.. _python-can: https://python-can.readthedocs.org/en/stable/\n.. _Copperhill technologies: http://copperhilltech.com/a-brief-introduction-to-the-sae-j1939-protocol/\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Fork of SAE J1939 stack implementation, originally created by Juergen Heilgemeir",
"version": "1.0.4",
"project_urls": {
"Homepage": "https://github.com/ttpscmolinaf/python-can-j1939"
},
"split_keywords": [
"can",
"sae",
"j1939",
"j1939-fd",
"j1939-22"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "3fc371d4e8d29de75d0c126e396d3fe45ad6d6ad037ff349ac3588c6faa4814b",
"md5": "8c2c3c842693534e1860d1379b1d514c",
"sha256": "d6162642619980343d45a74a4ae8f2f54e9cc6b7b56fa4e48f4d1fe252fd8117"
},
"downloads": -1,
"filename": "can_j1939_ttpsc-1.0.4-py2.py3-none-any.whl",
"has_sig": false,
"md5_digest": "8c2c3c842693534e1860d1379b1d514c",
"packagetype": "bdist_wheel",
"python_version": "py2.py3",
"requires_python": null,
"size": 39492,
"upload_time": "2024-05-24T12:36:45",
"upload_time_iso_8601": "2024-05-24T12:36:45.535956Z",
"url": "https://files.pythonhosted.org/packages/3f/c3/71d4e8d29de75d0c126e396d3fe45ad6d6ad037ff349ac3588c6faa4814b/can_j1939_ttpsc-1.0.4-py2.py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "d3947e58869bf455eea425121139ae1488997ed11da26b849a41fa759b0d81da",
"md5": "e8b4fc9aec030722cc48edd75511c6df",
"sha256": "38cec689e6da0876178bd7795402d6ec8de71082b219da826509b7bbcdc20e9d"
},
"downloads": -1,
"filename": "can-j1939-ttpsc-1.0.4.tar.gz",
"has_sig": false,
"md5_digest": "e8b4fc9aec030722cc48edd75511c6df",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 36321,
"upload_time": "2024-05-24T12:36:48",
"upload_time_iso_8601": "2024-05-24T12:36:48.094988Z",
"url": "https://files.pythonhosted.org/packages/d3/94/7e58869bf455eea425121139ae1488997ed11da26b849a41fa759b0d81da/can-j1939-ttpsc-1.0.4.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-05-24 12:36:48",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "ttpscmolinaf",
"github_project": "python-can-j1939",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "can-j1939-ttpsc"
}