can-j1939-ttpsc


Namecan-j1939-ttpsc JSON
Version 1.0.2 PyPI version JSON
download
home_pagehttps://github.com/ttpscmolinaf/python-can-j1939
SummaryFork of SAE J1939 stack implementation, originally created by Juergen Heilgemeir
upload_time2023-10-25 12:52:23
maintainer
docs_urlNone
authorTTPSC
requires_python
licenseMIT
keywords can sae j1939 j1939-fd j1939-22
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            Fork from TTPSC: SAE J1939 for Python
=====================================

|docs|

.. |docs| image:: https://readthedocs.org/projects/j1939/badge/?version=latest
   :target: https://j1939.readthedocs.io/en/stable/
   :alt: Documentation build Status

Overview
--------

This is a fork of the https://github.com/juergenH87/python-can-j1939 project, it essentially adds the possibility to get the origin of the message.
For more information please check the readme file in the author's project.

Installation
------------

Install can-j1939 with pip::

    $ pip install can-j1939-ttpsc

or do the trick with::

    $ git clone https://github.com/ttpscmolinaf/python-can-j1939
    $ cd j1939
    $ pip install .

Upgrade
------------

Upgrade an already installed can-j1939 package::

    $ pip install --upgrade can-j1939


Quick start
-----------

To simply receive all passing (public) messages on the bus you can subscribe to the ECU object.

.. code-block:: python

    import logging
    import time
    import can
    import j1939

    logging.getLogger('j1939').setLevel(logging.DEBUG)
    logging.getLogger('can').setLevel(logging.DEBUG)

    def on_message(priority, pgn, sa, timestamp, data):
        """Receive incoming messages from the bus

        :param int priority:
            Priority of the message
        :param int pgn:
            Parameter Group Number of the message
        :param int sa:
            Source Address of the message
        :param int timestamp:
            Timestamp of the message
        :param bytearray data:
            Data of the PDU
        """
        print("PGN {} length {}".format(pgn, len(data)))

    def main():
        print("Initializing")

        # create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)
        ecu = j1939.ElectronicControlUnit()

        # Connect to the CAN bus
        # Arguments are passed to python-can's can.interface.Bus() constructor
        # (see https://python-can.readthedocs.io/en/stable/bus.html).
        # ecu.connect(bustype='socketcan', channel='can0')
        # ecu.connect(bustype='kvaser', channel=0, bitrate=250000)
        ecu.connect(bustype='pcan', channel='PCAN_USBBUS1', bitrate=250000)
        # ecu.connect(bustype='ixxat', channel=0, bitrate=250000)
        # ecu.connect(bustype='vector', app_name='CANalyzer', channel=0, bitrate=250000)
        # ecu.connect(bustype='nican', channel='CAN0', bitrate=250000)

        # subscribe to all (global) messages on the bus
        ecu.subscribe(on_message)

        time.sleep(120)

        print("Deinitializing")
        ecu.disconnect()

    if __name__ == '__main__':
        main()

A more sophisticated example in which the CA class was overloaded to include its own functionality:

.. code-block:: python

    import logging
    import time
    import can
    import j1939

    logging.getLogger('j1939').setLevel(logging.DEBUG)
    logging.getLogger('can').setLevel(logging.DEBUG)

    # compose the name descriptor for the new ca
    name = j1939.Name(
        arbitrary_address_capable=0,
        industry_group=j1939.Name.IndustryGroup.Industrial,
        vehicle_system_instance=1,
        vehicle_system=1,
        function=1,
        function_instance=1,
        ecu_instance=1,
        manufacturer_code=666,
        identity_number=1234567
        )

    # create the ControllerApplications
    ca = j1939.ControllerApplication(name, 128)


    def ca_receive(priority, pgn, source, timestamp, data):
        """Feed incoming message to this CA.
        (OVERLOADED function)
        :param int priority:
            Priority of the message
        :param int pgn:
            Parameter Group Number of the message
        :param intsa:
            Source Address of the message
        :param int timestamp:
            Timestamp of the message
        :param bytearray data:
            Data of the PDU
        """
        print("PGN {} length {}".format(pgn, len(data)))

    def ca_timer_callback1(cookie):
        """Callback for sending messages

        This callback is registered at the ECU timer event mechanism to be
        executed every 500ms.

        :param cookie:
            A cookie registered at 'add_timer'. May be None.
        """
        # wait until we have our device_address
        if ca.state != j1939.ControllerApplication.State.NORMAL:
            # returning true keeps the timer event active
            return True

        # create data with 8 bytes
        data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * 8

        # sending normal broadcast message
        ca.send_pgn(0, 0xFD, 0xED, 6, data)

        # sending normal peer-to-peer message, destintion address is 0x04
        ca.send_pgn(0, 0xE0, 0x04, 6, data)

        # returning true keeps the timer event active
        return True


    def ca_timer_callback2(cookie):
        """Callback for sending messages

        This callback is registered at the ECU timer event mechanism to be
        executed every 500ms.

        :param cookie:
            A cookie registered at 'add_timer'. May be None.
        """
        # wait until we have our device_address
        if ca.state != j1939.ControllerApplication.State.NORMAL:
            # returning true keeps the timer event active
            return True

        # create data with 100 bytes
        data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * 100

        # sending multipacket message with TP-BAM
        ca.send_pgn(0, 0xFE, 0xF6, 6, data)

        # sending multipacket message with TP-CMDT, destination address is 0x05
        ca.send_pgn(0, 0xD0, 0x05, 6, data)

        # returning true keeps the timer event active
        return True

    def main():
        print("Initializing")

        # create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)
        ecu = j1939.ElectronicControlUnit()

        # Connect to the CAN bus
        # Arguments are passed to python-can's can.interface.Bus() constructor
        # (see https://python-can.readthedocs.io/en/stable/bus.html).
        # ecu.connect(bustype='socketcan', channel='can0')
        # ecu.connect(bustype='kvaser', channel=0, bitrate=250000)
        ecu.connect(bustype='pcan', channel='PCAN_USBBUS1', bitrate=250000)
        # ecu.connect(bustype='ixxat', channel=0, bitrate=250000)
        # ecu.connect(bustype='vector', app_name='CANalyzer', channel=0, bitrate=250000)
        # ecu.connect(bustype='nican', channel='CAN0', bitrate=250000)
        # ecu.connect('testchannel_1', bustype='virtual')

        # add CA to the ECU
        ecu.add_ca(controller_application=ca)
        ca.subscribe(ca_receive)
        # callback every 0.5s
        ca.add_timer(0.500, ca_timer_callback1)
        # callback every 5s
        ca.add_timer(5, ca_timer_callback2)
        # by starting the CA it starts the address claiming procedure on the bus
        ca.start()

        time.sleep(120)

        print("Deinitializing")
        ca.stop()
        ecu.disconnect()

    if __name__ == '__main__':
        main()


Credits
-------
This implementation was taken from https://github.com/juergenH87/python-can-j1939, as we needed to get information of the bus for the message that was received by the library.

Thanks for your great work Juergen :)!



.. _python-can: https://python-can.readthedocs.org/en/stable/
.. _Copperhill technologies: http://copperhilltech.com/a-brief-introduction-to-the-sae-j1939-protocol/

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/ttpscmolinaf/python-can-j1939",
    "name": "can-j1939-ttpsc",
    "maintainer": "",
    "docs_url": null,
    "requires_python": "",
    "maintainer_email": "",
    "keywords": "CAN SAE J1939 J1939-FD J1939-22",
    "author": "TTPSC",
    "author_email": "",
    "download_url": "https://files.pythonhosted.org/packages/7e/fe/ff193f281a95697b8645c79cf2fd60574dbf7fd713a4f9eb57c95cb324b5/can-j1939-ttpsc-1.0.2.tar.gz",
    "platform": "any",
    "description": "Fork from TTPSC: SAE J1939 for Python\n=====================================\n\n|docs|\n\n.. |docs| image:: https://readthedocs.org/projects/j1939/badge/?version=latest\n   :target: https://j1939.readthedocs.io/en/stable/\n   :alt: Documentation build Status\n\nOverview\n--------\n\nThis is a fork of the https://github.com/juergenH87/python-can-j1939 project, it essentially adds the possibility to get the origin of the message.\nFor more information please check the readme file in the author's project.\n\nInstallation\n------------\n\nInstall can-j1939 with pip::\n\n    $ pip install can-j1939-ttpsc\n\nor do the trick with::\n\n    $ git clone https://github.com/ttpscmolinaf/python-can-j1939\n    $ cd j1939\n    $ pip install .\n\nUpgrade\n------------\n\nUpgrade an already installed can-j1939 package::\n\n    $ pip install --upgrade can-j1939\n\n\nQuick start\n-----------\n\nTo simply receive all passing (public) messages on the bus you can subscribe to the ECU object.\n\n.. code-block:: python\n\n    import logging\n    import time\n    import can\n    import j1939\n\n    logging.getLogger('j1939').setLevel(logging.DEBUG)\n    logging.getLogger('can').setLevel(logging.DEBUG)\n\n    def on_message(priority, pgn, sa, timestamp, data):\n        \"\"\"Receive incoming messages from the bus\n\n        :param int priority:\n            Priority of the message\n        :param int pgn:\n            Parameter Group Number of the message\n        :param int sa:\n            Source Address of the message\n        :param int timestamp:\n            Timestamp of the message\n        :param bytearray data:\n            Data of the PDU\n        \"\"\"\n        print(\"PGN {} length {}\".format(pgn, len(data)))\n\n    def main():\n        print(\"Initializing\")\n\n        # create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)\n        ecu = j1939.ElectronicControlUnit()\n\n        # Connect to the CAN bus\n        # Arguments are passed to python-can's can.interface.Bus() constructor\n        # (see https://python-can.readthedocs.io/en/stable/bus.html).\n        # ecu.connect(bustype='socketcan', channel='can0')\n        # ecu.connect(bustype='kvaser', channel=0, bitrate=250000)\n        ecu.connect(bustype='pcan', channel='PCAN_USBBUS1', bitrate=250000)\n        # ecu.connect(bustype='ixxat', channel=0, bitrate=250000)\n        # ecu.connect(bustype='vector', app_name='CANalyzer', channel=0, bitrate=250000)\n        # ecu.connect(bustype='nican', channel='CAN0', bitrate=250000)\n\n        # subscribe to all (global) messages on the bus\n        ecu.subscribe(on_message)\n\n        time.sleep(120)\n\n        print(\"Deinitializing\")\n        ecu.disconnect()\n\n    if __name__ == '__main__':\n        main()\n\nA more sophisticated example in which the CA class was overloaded to include its own functionality:\n\n.. code-block:: python\n\n    import logging\n    import time\n    import can\n    import j1939\n\n    logging.getLogger('j1939').setLevel(logging.DEBUG)\n    logging.getLogger('can').setLevel(logging.DEBUG)\n\n    # compose the name descriptor for the new ca\n    name = j1939.Name(\n        arbitrary_address_capable=0,\n        industry_group=j1939.Name.IndustryGroup.Industrial,\n        vehicle_system_instance=1,\n        vehicle_system=1,\n        function=1,\n        function_instance=1,\n        ecu_instance=1,\n        manufacturer_code=666,\n        identity_number=1234567\n        )\n\n    # create the ControllerApplications\n    ca = j1939.ControllerApplication(name, 128)\n\n\n    def ca_receive(priority, pgn, source, timestamp, data):\n        \"\"\"Feed incoming message to this CA.\n        (OVERLOADED function)\n        :param int priority:\n            Priority of the message\n        :param int pgn:\n            Parameter Group Number of the message\n        :param intsa:\n            Source Address of the message\n        :param int timestamp:\n            Timestamp of the message\n        :param bytearray data:\n            Data of the PDU\n        \"\"\"\n        print(\"PGN {} length {}\".format(pgn, len(data)))\n\n    def ca_timer_callback1(cookie):\n        \"\"\"Callback for sending messages\n\n        This callback is registered at the ECU timer event mechanism to be\n        executed every 500ms.\n\n        :param cookie:\n            A cookie registered at 'add_timer'. May be None.\n        \"\"\"\n        # wait until we have our device_address\n        if ca.state != j1939.ControllerApplication.State.NORMAL:\n            # returning true keeps the timer event active\n            return True\n\n        # create data with 8 bytes\n        data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * 8\n\n        # sending normal broadcast message\n        ca.send_pgn(0, 0xFD, 0xED, 6, data)\n\n        # sending normal peer-to-peer message, destintion address is 0x04\n        ca.send_pgn(0, 0xE0, 0x04, 6, data)\n\n        # returning true keeps the timer event active\n        return True\n\n\n    def ca_timer_callback2(cookie):\n        \"\"\"Callback for sending messages\n\n        This callback is registered at the ECU timer event mechanism to be\n        executed every 500ms.\n\n        :param cookie:\n            A cookie registered at 'add_timer'. May be None.\n        \"\"\"\n        # wait until we have our device_address\n        if ca.state != j1939.ControllerApplication.State.NORMAL:\n            # returning true keeps the timer event active\n            return True\n\n        # create data with 100 bytes\n        data = [j1939.ControllerApplication.FieldValue.NOT_AVAILABLE_8] * 100\n\n        # sending multipacket message with TP-BAM\n        ca.send_pgn(0, 0xFE, 0xF6, 6, data)\n\n        # sending multipacket message with TP-CMDT, destination address is 0x05\n        ca.send_pgn(0, 0xD0, 0x05, 6, data)\n\n        # returning true keeps the timer event active\n        return True\n\n    def main():\n        print(\"Initializing\")\n\n        # create the ElectronicControlUnit (one ECU can hold multiple ControllerApplications)\n        ecu = j1939.ElectronicControlUnit()\n\n        # Connect to the CAN bus\n        # Arguments are passed to python-can's can.interface.Bus() constructor\n        # (see https://python-can.readthedocs.io/en/stable/bus.html).\n        # ecu.connect(bustype='socketcan', channel='can0')\n        # ecu.connect(bustype='kvaser', channel=0, bitrate=250000)\n        ecu.connect(bustype='pcan', channel='PCAN_USBBUS1', bitrate=250000)\n        # ecu.connect(bustype='ixxat', channel=0, bitrate=250000)\n        # ecu.connect(bustype='vector', app_name='CANalyzer', channel=0, bitrate=250000)\n        # ecu.connect(bustype='nican', channel='CAN0', bitrate=250000)\n        # ecu.connect('testchannel_1', bustype='virtual')\n\n        # add CA to the ECU\n        ecu.add_ca(controller_application=ca)\n        ca.subscribe(ca_receive)\n        # callback every 0.5s\n        ca.add_timer(0.500, ca_timer_callback1)\n        # callback every 5s\n        ca.add_timer(5, ca_timer_callback2)\n        # by starting the CA it starts the address claiming procedure on the bus\n        ca.start()\n\n        time.sleep(120)\n\n        print(\"Deinitializing\")\n        ca.stop()\n        ecu.disconnect()\n\n    if __name__ == '__main__':\n        main()\n\n\nCredits\n-------\nThis implementation was taken from https://github.com/juergenH87/python-can-j1939, as we needed to get information of the bus for the message that was received by the library.\n\nThanks for your great work Juergen :)!\n\n\n\n.. _python-can: https://python-can.readthedocs.org/en/stable/\n.. _Copperhill technologies: http://copperhilltech.com/a-brief-introduction-to-the-sae-j1939-protocol/\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Fork of SAE J1939 stack implementation, originally created by Juergen Heilgemeir",
    "version": "1.0.2",
    "project_urls": {
        "Homepage": "https://github.com/ttpscmolinaf/python-can-j1939"
    },
    "split_keywords": [
        "can",
        "sae",
        "j1939",
        "j1939-fd",
        "j1939-22"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "7efeff193f281a95697b8645c79cf2fd60574dbf7fd713a4f9eb57c95cb324b5",
                "md5": "9fbf686d857faea5b0c1d0541207221b",
                "sha256": "8f6a900d17fe0527780447b81abbc2dd28a71ed0193acf3620b3535b434464fe"
            },
            "downloads": -1,
            "filename": "can-j1939-ttpsc-1.0.2.tar.gz",
            "has_sig": false,
            "md5_digest": "9fbf686d857faea5b0c1d0541207221b",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 37055,
            "upload_time": "2023-10-25T12:52:23",
            "upload_time_iso_8601": "2023-10-25T12:52:23.323491Z",
            "url": "https://files.pythonhosted.org/packages/7e/fe/ff193f281a95697b8645c79cf2fd60574dbf7fd713a4f9eb57c95cb324b5/can-j1939-ttpsc-1.0.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-10-25 12:52:23",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "ttpscmolinaf",
    "github_project": "python-can-j1939",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "can-j1939-ttpsc"
}
        
Elapsed time: 0.13097s